Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](user manager) Add rwlock for userManager #34699

Merged
merged 6 commits into from
May 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.doris.common.PatternMatcherException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
Expand All @@ -44,22 +46,38 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;

public class UserManager implements Writable, GsonPostProcessable {
public static final String ANY_HOST = "%";
private static final Logger LOG = LogManager.getLogger(UserManager.class);
// Concurrency control is delegated by Auth, so not concurrentMap

private static final QueryableReentrantReadWriteLock rwLock = new QueryableReentrantReadWriteLock(false);
private static final Lock rlock = rwLock.readLock();
private static final Lock wlock = rwLock.writeLock();

// One name may have multiple User,because host can be different
@SerializedName(value = "nameToUsers")
private Map<String, List<User>> nameToUsers = Maps.newHashMap();

public boolean userIdentityExist(UserIdentity userIdentity, boolean includeByDomain) {
rlock.lock();
try {
return userIdentityExistWithoutLock(userIdentity, includeByDomain);
} finally {
rlock.unlock();
}
}

public boolean userIdentityExistWithoutLock(UserIdentity userIdentity, boolean includeByDomain) {
List<User> users = nameToUsers.get(userIdentity.getQualifiedUser());
if (CollectionUtils.isEmpty(users)) {
return false;
Expand All @@ -75,8 +93,13 @@ public boolean userIdentityExist(UserIdentity userIdentity, boolean includeByDom
}

public List<User> getUserByName(String name) {
List<User> users = nameToUsers.get(name);
return users == null ? Collections.EMPTY_LIST : users;
rlock.lock();
try {
List<User> users = nameToUsers.get(name);
return users == null ? Collections.EMPTY_LIST : users;
} finally {
rlock.unlock();
}
}

public void checkPassword(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString,
Expand All @@ -92,10 +115,16 @@ public void checkPlainPassword(String remoteUser, String remoteHost, String remo
private void checkPasswordInternal(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString,
String remotePasswdStr, List<UserIdentity> currentUser, boolean plain) throws AuthenticationException {
PasswordPolicyManager passwdPolicyMgr = Env.getCurrentEnv().getAuth().getPasswdPolicyManager();
List<User> users = nameToUsers.get(remoteUser);
if (CollectionUtils.isEmpty(users)) {
throw new AuthenticationException(ErrorCode.ERR_ACCESS_DENIED_ERROR, remoteUser + "@" + remoteHost,
List<User> users = new ArrayList<>();
rlock.lock();
try {
users = nameToUsers.get(remoteUser);
if (CollectionUtils.isEmpty(users)) {
throw new AuthenticationException(ErrorCode.ERR_ACCESS_DENIED_ERROR, remoteUser + "@" + remoteHost,
"YES");
}
} finally {
rlock.unlock();
}

for (User user : users) {
Expand Down Expand Up @@ -131,13 +160,19 @@ private void checkPasswordInternal(String remoteUser, String remoteHost, byte[]

public List<UserIdentity> getUserIdentityUncheckPasswd(String remoteUser, String remoteHost) {
List<UserIdentity> userIdentities = Lists.newArrayList();
List<User> users = nameToUsers.getOrDefault(remoteUser, Lists.newArrayList());
for (User user : users) {
if (!user.getUserIdentity().isDomain() && (user.isAnyHost() || user.getHostPattern().match(remoteHost))) {
userIdentities.add(user.getUserIdentity());
rlock.lock();
try {
List<User> users = nameToUsers.getOrDefault(remoteUser, Lists.newArrayList());
for (User user : users) {
if (!user.getUserIdentity().isDomain()
&& (user.isAnyHost() || user.getHostPattern().match(remoteHost))) {
userIdentities.add(user.getUserIdentity());
}
}
return userIdentities;
} finally {
rlock.unlock();
}
return userIdentities;
}

private String hasRemotePasswd(boolean plain, byte[] remotePasswd) {
Expand All @@ -163,30 +198,44 @@ private boolean comparePassword(Password curUserPassword, byte[] remotePasswd,


public void clearEntriesSetByResolver() {
Iterator<Entry<String, List<User>>> iterator = nameToUsers.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<User>> next = iterator.next();
Iterator<User> iter = next.getValue().iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.isSetByDomainResolver()) {
iter.remove();
wlock.lock();
try {
Iterator<Entry<String, List<User>>> iterator = nameToUsers.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<User>> next = iterator.next();
Iterator<User> iter = next.getValue().iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.isSetByDomainResolver()) {
iter.remove();
}
}
if (CollectionUtils.isEmpty(next.getValue())) {
iterator.remove();
} else {
Collections.sort(next.getValue());
}
}
if (CollectionUtils.isEmpty(next.getValue())) {
iterator.remove();
} else {
Collections.sort(next.getValue());
}
} finally {
wlock.unlock();
}

}

public User createUser(UserIdentity userIdent, byte[] pwd, UserIdentity domainUserIdent, boolean setByResolver,
String comment)
String comment) throws PatternMatcherException {
wlock.lock();
try {
return createUserWithoutLock(userIdent, pwd, domainUserIdent, setByResolver, comment);
} finally {
wlock.unlock();
}
}

public User createUserWithoutLock(UserIdentity userIdent, byte[] pwd, UserIdentity domainUserIdent,
boolean setByResolver, String comment)
throws PatternMatcherException {
if (userIdentityExist(userIdent, true)) {
User userByUserIdentity = getUserByUserIdentity(userIdent);
if (userIdentityExistWithoutLock(userIdent, true)) {
User userByUserIdentity = getUserByUserIdentityWithoutLock(userIdent);
if (!userByUserIdentity.isSetByDomainResolver() && setByResolver) {
// If the user is NOT created by domain resolver,
// and the current operation is done by DomainResolver,
Expand All @@ -211,9 +260,19 @@ public User createUser(UserIdentity userIdent, byte[] pwd, UserIdentity domainUs
Collections.sort(nameToLists);
}
return user;

}

public User getUserByUserIdentity(UserIdentity userIdent) {
rlock.lock();
try {
return getUserByUserIdentityWithoutLock(userIdent);
} finally {
rlock.unlock();
}
}

public User getUserByUserIdentityWithoutLock(UserIdentity userIdent) {
List<User> nameToLists = nameToUsers.get(userIdent.getQualifiedUser());
if (CollectionUtils.isEmpty(nameToLists)) {
return null;
Expand All @@ -229,26 +288,36 @@ public User getUserByUserIdentity(UserIdentity userIdent) {
}

public void removeUser(UserIdentity userIdent) {
List<User> nameToLists = nameToUsers.get(userIdent.getQualifiedUser());
if (CollectionUtils.isEmpty(nameToLists)) {
return;
}
Iterator<User> iter = nameToLists.iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.getUserIdentity().equals(userIdent)) {
iter.remove();
wlock.lock();
try {
List<User> nameToLists = nameToUsers.get(userIdent.getQualifiedUser());
if (CollectionUtils.isEmpty(nameToLists)) {
return;
}
}
if (CollectionUtils.isEmpty(nameToLists)) {
nameToUsers.remove(userIdent.getQualifiedUser());
} else {
Collections.sort(nameToLists);
Iterator<User> iter = nameToLists.iterator();
while (iter.hasNext()) {
User user = iter.next();
if (user.getUserIdentity().equals(userIdent)) {
iter.remove();
}
}
if (CollectionUtils.isEmpty(nameToLists)) {
nameToUsers.remove(userIdent.getQualifiedUser());
} else {
Collections.sort(nameToLists);
}
} finally {
wlock.unlock();
}
}

public Map<String, List<User>> getNameToUsers() {
return nameToUsers;
rlock.lock();
try {
return ImmutableMap.copyOf(nameToUsers);
} finally {
rlock.unlock();
}
}

public void setPassword(UserIdentity userIdentity, byte[] password, boolean errOnNonExist) throws DdlException {
Expand All @@ -263,38 +332,48 @@ public void setPassword(UserIdentity userIdentity, byte[] password, boolean errO
}

public void getAllDomains(Set<String> allDomains) {
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
for (User user : entry.getValue()) {
if (user.getUserIdentity().isDomain()) {
allDomains.add(user.getUserIdentity().getHost());
rlock.lock();
try {
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
for (User user : entry.getValue()) {
if (user.getUserIdentity().isDomain()) {
allDomains.add(user.getUserIdentity().getHost());
}
}
}
} finally {
rlock.unlock();
}
}

// handle new resolved IPs.
// it will only modify password entry of these resolved IPs. All other privileges are binded
// to the domain, so no need to modify.
public void addUserPrivEntriesByResolvedIPs(Map<String, Set<String>> resolvedIPsMap) {
for (Entry<String, List<User>> userEntry : nameToUsers.entrySet()) {
for (Map.Entry<String, Set<String>> entry : resolvedIPsMap.entrySet()) {
User domainUser = getDomainUser(userEntry.getValue(), entry.getKey());
if (domainUser == null) {
continue;
}
// this user ident will be saved along with each resolved "IP" user ident, so that when checking
// password, this "domain" user ident will be returned as "current user".
for (String newIP : entry.getValue()) {
UserIdentity userIdent = UserIdentity.createAnalyzedUserIdentWithIp(userEntry.getKey(), newIP);
byte[] password = domainUser.getPassword().getPassword();
Preconditions.checkNotNull(password, entry.getKey());
try {
createUser(userIdent, password, domainUser.getUserIdentity(), true, "");
} catch (PatternMatcherException e) {
LOG.info("failed to create user for user ident: {}, {}", userIdent, e.getMessage());
wlock.lock();
try {
for (Entry<String, List<User>> userEntry : nameToUsers.entrySet()) {
for (Map.Entry<String, Set<String>> entry : resolvedIPsMap.entrySet()) {
User domainUser = getDomainUser(userEntry.getValue(), entry.getKey());
if (domainUser == null) {
continue;
}
// this user ident will be saved along with each resolved "IP" user ident, so that when checking
// password, this "domain" user ident will be returned as "current user".
for (String newIP : entry.getValue()) {
UserIdentity userIdent = UserIdentity.createAnalyzedUserIdentWithIp(userEntry.getKey(), newIP);
byte[] password = domainUser.getPassword().getPassword();
Preconditions.checkNotNull(password, entry.getKey());
try {
createUserWithoutLock(userIdent, password, domainUser.getUserIdentity(), true, "");
} catch (PatternMatcherException e) {
LOG.info("failed to create user for user ident: {}, {}", userIdent, e.getMessage());
}
}
}
}
} finally {
wlock.unlock();
}
}

Expand All @@ -309,7 +388,12 @@ private User getDomainUser(List<User> users, String domain) {

@Override
public String toString() {
return nameToUsers.toString();
rlock.lock();
try {
return nameToUsers.toString();
} finally {
rlock.unlock();
}
}

@Override
Expand All @@ -326,11 +410,16 @@ public static UserManager read(DataInput in) throws IOException {
// should be removed after version 3.0
private void removeClusterPrefix() {
Map<String, List<User>> newNameToUsers = Maps.newHashMap();
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
String user = entry.getKey();
newNameToUsers.put(ClusterNamespace.getNameFromFullName(user), entry.getValue());
wlock.lock();
try {
for (Entry<String, List<User>> entry : nameToUsers.entrySet()) {
String user = entry.getKey();
newNameToUsers.put(ClusterNamespace.getNameFromFullName(user), entry.getValue());
}
this.nameToUsers = newNameToUsers;
} finally {
wlock.unlock();
}
this.nameToUsers = newNameToUsers;
}

@Override
Expand All @@ -340,22 +429,32 @@ public void gsonPostProcess() throws IOException {

// ====== CLOUD ======
public Set<String> getAllUsers() {
return nameToUsers.keySet();
rlock.lock();
try {
return new HashSet<>(nameToUsers.keySet());
} finally {
rlock.unlock();
}
}

public String getUserId(String userName) {
if (!nameToUsers.containsKey(userName)) {
LOG.warn("can't find userName {} 's userId, nameToUsers {}", userName, nameToUsers);
return "";
}
List<User> users = nameToUsers.get(userName);
if (users.isEmpty()) {
LOG.warn("userName {} empty users in map {}", userName, nameToUsers);
rlock.lock();
try {
if (!nameToUsers.containsKey(userName)) {
LOG.warn("can't find userName {} 's userId, nameToUsers {}", userName, nameToUsers);
return "";
}
List<User> users = nameToUsers.get(userName);
if (users.isEmpty()) {
LOG.warn("userName {} empty users in map {}", userName, nameToUsers);
}
// here, all the users has same userid, just return one
String userId = users.stream().map(User::getUserId).filter(Strings::isNotEmpty).findFirst().orElse("");
LOG.debug("userName {}, userId {}, map {}", userName, userId, nameToUsers);
return userId;
} finally {
rlock.unlock();
}
// here, all the users has same userid, just return one
String userId = users.stream().map(User::getUserId).filter(Strings::isNotEmpty).findFirst().orElse("");
LOG.debug("userName {}, userId {}, map {}", userName, userId, nameToUsers);
return userId;
}

// ====== CLOUD =====
Expand Down