Skip to content

Commit

Permalink
[fix](user manager) Add rwlock for userManager (#34699)
Browse files Browse the repository at this point in the history
* [fix](user manager) Add rwlock for userManager

fix
```
2024-05-10 14:33:55,005 mysql-nio-pool-67 ERROR An exception occurred processing Appender Sys java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1597)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1630)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1628)
at org.apache.logging.log4j.message.ParameterFormatter.appendMap(ParameterFormatter.java:581)
at org.apache.logging.log4j.message.ParameterFormatter.appendPotentiallyRecursiveValue(ParameterFormatter.java:512)
at org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:477)
at org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:448)
at org.apache.logging.log4j.message.ParameterFormatter.formatMessage2(ParameterFormatter.java:191)
at org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:227)
at org.apache.logging.log4j.core.pattern.MessagePatternConverter$SimpleMessagePatternConverter.format(MessagePatternConverter.java:121)
at org.apache.logging.log4j.core.layout.PatternLayout$NoFormatPatternSerializer.toSerializable(PatternLayout.java:342)
at org.apache.logging.log4j.core.layout.PatternLayout.toText(PatternLayout.java:240)
at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:225)
at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:59)
at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.directEncodeEvent(AbstractOutputStreamAppender.java:215)
at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.tryAppend(AbstractOutputStreamAppender.java:208)
at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:199)
at org.apache.logging.log4j.core.appender.RollingFileAppender.append(RollingFileAppender.java:301)
at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:161)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:134)
at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:125)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:89)
at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:675)
at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:633)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:616)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:552)
at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2205)
at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2159)
at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2142)
at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2040)
at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1907)
at org.apache.logging.log4j.spi.AbstractLogger.warn(AbstractLogger.java:2789)
at org.apache.doris.mysql.privilege.UserManager.getUserId(UserManager.java:348)
at org.apache.doris.mysql.privilege.Auth.getUserId(Auth.java:1950)
at org.apache.doris.analysis.CreateUserStmt.<init>(CreateUserStmt.java:94)
at org.apache.doris.analysis.CUP$SqlParser$actions.case209(SqlParser.java:51399)
at org.apache.doris.analysis.CUP$SqlParser$actions.CUP$SqlParser$do_action(SqlParser.java:19433)
at org.apache.doris.analysis.SqlParser.do_action(SqlParser.java:2972)
at java_cup.runtime.lr_parser.parse(lr_parser.java:584)
at org.apache.doris.common.util.SqlParserUtils.getMultiStmts(SqlParserUtils.java:60)
at org.apache.doris.qe.ConnectProcessor.parse(ConnectProcessor.java:516)
at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:296)
at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:232)
at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:177)
```

* add log

* log

* fix

* fix again

* fix
  • Loading branch information
deardeng authored and dataroaring committed May 26, 2024
1 parent 9a0116c commit 5b0f2af
Showing 1 changed file with 179 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.doris.common.PatternMatcherException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
Expand All @@ -44,22 +46,38 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;

public class UserManager implements Writable, GsonPostProcessable {
public static final String ANY_HOST = "%";
private static final Logger LOG = LogManager.getLogger(UserManager.class);
// Concurrency control is delegated by Auth, so not concurrentMap

private static final QueryableReentrantReadWriteLock rwLock = new QueryableReentrantReadWriteLock(false);
private static final Lock rlock = rwLock.readLock();
private static final Lock wlock = rwLock.writeLock();

// One name may have multiple User,because host can be different
@SerializedName(value = "nameToUsers")
private Map<String, List<User>> nameToUsers = Maps.newHashMap();

public boolean userIdentityExist(UserIdentity userIdentity, boolean includeByDomain) {
rlock.lock();
try {
return userIdentityExistWithoutLock(userIdentity, includeByDomain);
} finally {
rlock.unlock();
}
}

public boolean userIdentityExistWithoutLock(UserIdentity userIdentity, boolean includeByDomain) {
List<User> users = nameToUsers.get(userIdentity.getQualifiedUser());
if (CollectionUtils.isEmpty(users)) {
return false;
Expand All @@ -75,8 +93,13 @@ public boolean userIdentityExist(UserIdentity userIdentity, boolean includeByDom
}

public List<User> getUserByName(String name) {
List<User> users = nameToUsers.get(name);
return users == null ? Collections.EMPTY_LIST : users;
rlock.lock();
try {
List<User> users = nameToUsers.get(name);
return users == null ? Collections.EMPTY_LIST : users;
} finally {
rlock.unlock();
}
}

public void checkPassword(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString,
Expand All @@ -92,10 +115,16 @@ public void checkPlainPassword(String remoteUser, String remoteHost, String remo
private void checkPasswordInternal(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString,
String remotePasswdStr, List<UserIdentity> currentUser, boolean plain) throws AuthenticationException {
PasswordPolicyManager passwdPolicyMgr = Env.getCurrentEnv().getAuth().getPasswdPolicyManager();
List<User> users = nameToUsers.get(remoteUser);
if (CollectionUtils.isEmpty(users)) {
throw new AuthenticationException(ErrorCode.ERR_ACCESS_DENIED_ERROR, remoteUser + "@" + remoteHost,
List<User> users = new ArrayList<>();
rlock.lock();
try {
users = nameToUsers.get(remoteUser);
if (CollectionUtils.isEmpty(users)) {
throw new AuthenticationException(ErrorCode.ERR_ACCESS_DENIED_ERROR, remoteUser + "@" + remoteHost,
"YES");
}
} finally {
rlock.unlock();
}

for (User user : users) {
Expand Down Expand Up @@ -131,13 +160,19 @@ private void checkPasswordInternal(String remoteUser, String remoteHost, byte[]

public List<UserIdentity> getUserIdentityUncheckPasswd(String remoteUser, String remoteHost) {
List<UserIdentity> userIdentities = Lists.newArrayList();
List<User> users = nameToUsers.getOrDefault(remoteUser, Lists.newArrayList());
for (User user : users) {
if (!user.getUserIdentity().isDomain() && (user.isAnyHost() || user.getHostPattern().match(remoteHost))) {
userIdentities.add(user.getUserIdentity());
rlock.lock();
try {
List<User> users = nameToUsers.getOrDefault(remoteUser, Lists.newArrayList());
for (User user : users) {
if (!user.getUserIdentity().isDomain()
&& (user.isAnyHost() || user.getHostPattern().match(remoteHost))) {
userIdentities.add(user.getUserIdentity());
}
}
return userIdentities;
} finally {
rlock.unlock();
}
return userIdentities;
}

private String hasRemotePasswd(boolean plain, byte[] remotePasswd) {
Expand All @@ -163,30 +198,44 @@ private boolean comparePassword(Password curUserPassword, byte[] remotePasswd,


public void clearEntriesSetByResolver() {
Iterator<Entry<String, List<User>>> iterator = nameToUsers.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<User>> next = iterator.next();
Iterator<User> iter = next.getValue().iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.isSetByDomainResolver()) {
iter.remove();
wlock.lock();
try {
Iterator<Entry<String, List<User>>> iterator = nameToUsers.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<User>> next = iterator.next();
Iterator<User> iter = next.getValue().iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.isSetByDomainResolver()) {
iter.remove();
}
}
if (CollectionUtils.isEmpty(next.getValue())) {
iterator.remove();
} else {
Collections.sort(next.getValue());
}
}
if (CollectionUtils.isEmpty(next.getValue())) {
iterator.remove();
} else {
Collections.sort(next.getValue());
}
} finally {
wlock.unlock();
}

}

public User createUser(UserIdentity userIdent, byte[] pwd, UserIdentity domainUserIdent, boolean setByResolver,
String comment)
String comment) throws PatternMatcherException {
wlock.lock();
try {
return createUserWithoutLock(userIdent, pwd, domainUserIdent, setByResolver, comment);
} finally {
wlock.unlock();
}
}

public User createUserWithoutLock(UserIdentity userIdent, byte[] pwd, UserIdentity domainUserIdent,
boolean setByResolver, String comment)
throws PatternMatcherException {
if (userIdentityExist(userIdent, true)) {
User userByUserIdentity = getUserByUserIdentity(userIdent);
if (userIdentityExistWithoutLock(userIdent, true)) {
User userByUserIdentity = getUserByUserIdentityWithoutLock(userIdent);
if (!userByUserIdentity.isSetByDomainResolver() && setByResolver) {
// If the user is NOT created by domain resolver,
// and the current operation is done by DomainResolver,
Expand All @@ -211,9 +260,19 @@ public User createUser(UserIdentity userIdent, byte[] pwd, UserIdentity domainUs
Collections.sort(nameToLists);
}
return user;

}

public User getUserByUserIdentity(UserIdentity userIdent) {
rlock.lock();
try {
return getUserByUserIdentityWithoutLock(userIdent);
} finally {
rlock.unlock();
}
}

public User getUserByUserIdentityWithoutLock(UserIdentity userIdent) {
List<User> nameToLists = nameToUsers.get(userIdent.getQualifiedUser());
if (CollectionUtils.isEmpty(nameToLists)) {
return null;
Expand All @@ -229,26 +288,36 @@ public User getUserByUserIdentity(UserIdentity userIdent) {
}

public void removeUser(UserIdentity userIdent) {
List<User> nameToLists = nameToUsers.get(userIdent.getQualifiedUser());
if (CollectionUtils.isEmpty(nameToLists)) {
return;
}
Iterator<User> iter = nameToLists.iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.getUserIdentity().equals(userIdent)) {
iter.remove();
wlock.lock();
try {
List<User> nameToLists = nameToUsers.get(userIdent.getQualifiedUser());
if (CollectionUtils.isEmpty(nameToLists)) {
return;
}
}
if (CollectionUtils.isEmpty(nameToLists)) {
nameToUsers.remove(userIdent.getQualifiedUser());
} else {
Collections.sort(nameToLists);
Iterator<User> iter = nameToLists.iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.getUserIdentity().equals(userIdent)) {
iter.remove();
}
}
if (CollectionUtils.isEmpty(nameToLists)) {
nameToUsers.remove(userIdent.getQualifiedUser());
} else {
Collections.sort(nameToLists);
}
} finally {
wlock.unlock();
}
}

public Map<String, List<User>> getNameToUsers() {
return nameToUsers;
rlock.lock();
try {
return ImmutableMap.copyOf(nameToUsers);
} finally {
rlock.unlock();
}
}

public void setPassword(UserIdentity userIdentity, byte[] password, boolean errOnNonExist) throws DdlException {
Expand All @@ -263,38 +332,48 @@ public void setPassword(UserIdentity userIdentity, byte[] password, boolean errO
}

public void getAllDomains(Set<String> allDomains) {
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
for (User user : entry.getValue()) {
if (user.getUserIdentity().isDomain()) {
allDomains.add(user.getUserIdentity().getHost());
rlock.lock();
try {
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
for (User user : entry.getValue()) {
if (user.getUserIdentity().isDomain()) {
allDomains.add(user.getUserIdentity().getHost());
}
}
}
} finally {
rlock.unlock();
}
}

// handle new resolved IPs.
// it will only modify password entry of these resolved IPs. All other privileges are binded
// to the domain, so no need to modify.
public void addUserPrivEntriesByResolvedIPs(Map<String, Set<String>> resolvedIPsMap) {
for (Entry<String, List<User>> userEntry : nameToUsers.entrySet()) {
for (Map.Entry<String, Set<String>> entry : resolvedIPsMap.entrySet()) {
User domainUser = getDomainUser(userEntry.getValue(), entry.getKey());
if (domainUser == null) {
continue;
}
// this user ident will be saved along with each resolved "IP" user ident, so that when checking
// password, this "domain" user ident will be returned as "current user".
for (String newIP : entry.getValue()) {
UserIdentity userIdent = UserIdentity.createAnalyzedUserIdentWithIp(userEntry.getKey(), newIP);
byte[] password = domainUser.getPassword().getPassword();
Preconditions.checkNotNull(password, entry.getKey());
try {
createUser(userIdent, password, domainUser.getUserIdentity(), true, "");
} catch (PatternMatcherException e) {
LOG.info("failed to create user for user ident: {}, {}", userIdent, e.getMessage());
wlock.lock();
try {
for (Entry<String, List<User>> userEntry : nameToUsers.entrySet()) {
for (Map.Entry<String, Set<String>> entry : resolvedIPsMap.entrySet()) {
User domainUser = getDomainUser(userEntry.getValue(), entry.getKey());
if (domainUser == null) {
continue;
}
// this user ident will be saved along with each resolved "IP" user ident, so that when checking
// password, this "domain" user ident will be returned as "current user".
for (String newIP : entry.getValue()) {
UserIdentity userIdent = UserIdentity.createAnalyzedUserIdentWithIp(userEntry.getKey(), newIP);
byte[] password = domainUser.getPassword().getPassword();
Preconditions.checkNotNull(password, entry.getKey());
try {
createUserWithoutLock(userIdent, password, domainUser.getUserIdentity(), true, "");
} catch (PatternMatcherException e) {
LOG.info("failed to create user for user ident: {}, {}", userIdent, e.getMessage());
}
}
}
}
} finally {
wlock.unlock();
}
}

Expand All @@ -309,7 +388,12 @@ private User getDomainUser(List<User> users, String domain) {

@Override
public String toString() {
return nameToUsers.toString();
rlock.lock();
try {
return nameToUsers.toString();
} finally {
rlock.unlock();
}
}

@Override
Expand All @@ -326,11 +410,16 @@ public static UserManager read(DataInput in) throws IOException {
// should be removed after version 3.0
private void removeClusterPrefix() {
Map<String, List<User>> newNameToUsers = Maps.newHashMap();
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
String user = entry.getKey();
newNameToUsers.put(ClusterNamespace.getNameFromFullName(user), entry.getValue());
wlock.lock();
try {
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
String user = entry.getKey();
newNameToUsers.put(ClusterNamespace.getNameFromFullName(user), entry.getValue());
}
this.nameToUsers = newNameToUsers;
} finally {
wlock.unlock();
}
this.nameToUsers = newNameToUsers;
}

@Override
Expand All @@ -340,22 +429,32 @@ public void gsonPostProcess() throws IOException {

// ====== CLOUD ======
public Set<String> getAllUsers() {
return nameToUsers.keySet();
rlock.lock();
try {
return new HashSet<>(nameToUsers.keySet());
} finally {
rlock.unlock();
}
}

public String getUserId(String userName) {
if (!nameToUsers.containsKey(userName)) {
LOG.warn("can't find userName {} 's userId, nameToUsers {}", userName, nameToUsers);
return "";
}
List<User> users = nameToUsers.get(userName);
if (users.isEmpty()) {
LOG.warn("userName {} empty users in map {}", userName, nameToUsers);
rlock.lock();
try {
if (!nameToUsers.containsKey(userName)) {
LOG.warn("can't find userName {} 's userId, nameToUsers {}", userName, nameToUsers);
return "";
}
List<User> users = nameToUsers.get(userName);
if (users.isEmpty()) {
LOG.warn("userName {} empty users in map {}", userName, nameToUsers);
}
// here, all the users has same userid, just return one
String userId = users.stream().map(User::getUserId).filter(Strings::isNotEmpty).findFirst().orElse("");
LOG.debug("userName {}, userId {}, map {}", userName, userId, nameToUsers);
return userId;
} finally {
rlock.unlock();
}
// here, all the users has same userid, just return one
String userId = users.stream().map(User::getUserId).filter(Strings::isNotEmpty).findFirst().orElse("");
LOG.debug("userName {}, userId {}, map {}", userName, userId, nameToUsers);
return userId;
}

// ====== CLOUD =====
Expand Down

0 comments on commit 5b0f2af

Please sign in to comment.