Skip to content

Commit

Permalink
Merge e2019a3 into 7886a24
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed May 20, 2020
2 parents 7886a24 + e2019a3 commit 2eb9678
Show file tree
Hide file tree
Showing 12 changed files with 734 additions and 45 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@

public class CacheConstants {
public static final String QUERY_CACHE = "StorageCache";
public static final String USER_CACHE = "UserCache";
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ public class MemcachedCacheManager extends AbstractCacheManager {
protected Collection<? extends Cache> loadCaches() {
Cache successCache = new MemCachedCacheAdaptor(
new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.QUERY_CACHE)));
Cache userCache = new MemCachedCacheAdaptor(
new MemcachedCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.USER_CACHE, 86400)));

addCache(successCache);
addCache(userCache);

Collection<String> names = getCacheNames();
Collection<Cache> caches = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ void disableRemoteCacheManager() {
void enableRemoteCacheManager() {
remoteCacheManager.setClusterHealth(true);
}

@VisibleForTesting
MemcachedCacheManager getRemoteCacheManager() {
return remoteCacheManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -136,13 +139,24 @@ public static MemcachedCache create(final MemcachedCacheConfig config, String me
.setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
.setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
return new MemcachedCache(new MemcachedClient(new MemcachedConnectionFactory(connectionFactory),
AddrUtil.getAddresses(hostsStr)), config, memcachedPrefix, timeToLive);
getResolvedAddrList(hostsStr)), config, memcachedPrefix, timeToLive);
} catch (IOException e) {
logger.error("Unable to create MemcachedCache instance.", e);
throw Throwables.propagate(e);
}
}

public static List<InetSocketAddress> getResolvedAddrList(String hostsStr) {
List<InetSocketAddress> addrs = AddrUtil.getAddresses(hostsStr);
Iterator<InetSocketAddress> addrIterator = addrs.iterator();
while (addrIterator.hasNext()) {
if (addrIterator.next().isUnresolved()) {
addrIterator.remove();
}
}
return addrs;
}

public String getName() {
return memcachedPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
//
//import net.spy.memcached.MemcachedClientIF;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:cacheContext.xml" })
Expand All @@ -58,5 +60,17 @@ public void testCacheManager() {
cacheManager.enableRemoteCacheManager();
Assert.assertTrue("Memcached enabled",
cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor);
//
// MemcachedCacheManager remoteCacheManager = cacheManager.getRemoteCacheManager();
// for (int i = 0; i < 1000; i++) {
// MemcachedClientIF client = (MemcachedClientIF) remoteCacheManager.getCache(QUERY_CACHE).getNativeCache();
// System.out.println(i + " available servers: " + client.getAvailableServers() + "; unavailable servers: "
// + client.getUnavailableServers());
// try {
// client.get("key");
// Thread.sleep(2000L);
// } catch (Exception e) {
// }
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -81,4 +83,11 @@ public void testGet() {
Assert.assertEquals("The value should not change", keyValueMap.get(key), memCachedAdaptor.get(key).get());
}
}

@Test
public void testGetResolvedAddrList() {
String hostsStr = "localhost:11211,fafddafaf:11211,fadfafaerqr:11211";
List<InetSocketAddress> addrList = MemcachedCache.getResolvedAddrList(hostsStr);
Assert.assertEquals(1, addrList.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@

package org.apache.kylin.rest.security;

import static org.apache.kylin.cache.cachemanager.CacheConstants.USER_CACHE;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.rest.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.security.authentication.AuthenticationProvider;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.AuthenticationException;
Expand All @@ -35,9 +40,7 @@
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.util.Assert;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.base.Preconditions;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

Expand All @@ -48,21 +51,13 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {

private static final Logger logger = LoggerFactory.getLogger(KylinAuthenticationProvider.class);

private final static com.google.common.cache.Cache<String, Authentication> userCache = CacheBuilder.newBuilder()
.maximumSize(KylinConfig.getInstanceFromEnv().getServerUserCacheMaxEntries())
.expireAfterWrite(KylinConfig.getInstanceFromEnv().getServerUserCacheExpireSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<String, Authentication>() {
@Override
public void onRemoval(RemovalNotification<String, Authentication> notification) {
KylinAuthenticationProvider.logger.debug("User cache {} is removed due to {}",
notification.getKey(), notification.getCause());
}
}).build();

@Autowired
@Qualifier("userService")
UserService userService;

@Autowired
private CacheManager cacheManager;

//Embedded authentication provider
private AuthenticationProvider authenticationProvider;

Expand All @@ -75,19 +70,21 @@ public KylinAuthenticationProvider(AuthenticationProvider authenticationProvider
hf = Hashing.murmur3_128();
}

@PostConstruct
public void init() {
Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet");
}

@Override
public Authentication authenticate(Authentication authentication) throws AuthenticationException {

byte[] hashKey = hf.hashString(authentication.getName() + authentication.getCredentials()).asBytes();
String userKey = Arrays.toString(hashKey);

if (userService.isEvictCacheFlag()) {
userCache.invalidateAll();
userService.setEvictCacheFlag(false);
}
Authentication authed = userCache.getIfPresent(userKey);

if (null != authed) {
Authentication authed;
Cache.ValueWrapper authedUser = cacheManager.getCache(USER_CACHE).get(userKey);
if (authedUser != null) {
authed = (Authentication) authedUser.get();
SecurityContextHolder.getContext().setAuthentication(authed);
} else {
try {
Expand Down Expand Up @@ -118,7 +115,7 @@ public Authentication authenticate(Authentication authentication) throws Authent
userService.updateUser(user);
}

userCache.put(userKey, authed);
cacheManager.getCache(USER_CACHE).put(userKey, authed);
} catch (AuthenticationException e) {
logger.error("Failed to auth user: " + authentication.getName(), e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,6 @@ public KylinUserService(List<User> users) throws IOException {

protected ResourceStore aclStore;

private boolean evictCacheFlag = false;

@Override
public boolean isEvictCacheFlag() {
return evictCacheFlag;
}

@Override
public void setEvictCacheFlag(boolean evictCacheFlag) {
this.evictCacheFlag = evictCacheFlag;
}

@PostConstruct
public void init() throws IOException {
aclStore = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
Expand Down Expand Up @@ -156,7 +144,6 @@ public void updateUser(UserDetails user) {
}
getKylinUserManager().update(managedUser);
logger.trace("update user : {}", user.getUsername());
setEvictCacheFlag(true);
}

@Override
Expand All @@ -166,7 +153,6 @@ public void deleteUser(String userName) {
}
getKylinUserManager().delete(userName);
logger.trace("delete user : {}", userName);
setEvictCacheFlag(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@

package org.apache.kylin.rest.service;

import org.apache.kylin.rest.security.ManagedUser;
import org.springframework.security.provisioning.UserDetailsManager;

import java.io.IOException;
import java.util.List;

public interface UserService extends UserDetailsManager {

boolean isEvictCacheFlag();
import org.apache.kylin.rest.security.ManagedUser;
import org.springframework.security.provisioning.UserDetailsManager;

void setEvictCacheFlag(boolean evictCacheFlag);
public interface UserService extends UserDetailsManager {

List<ManagedUser> listUsers() throws IOException;

Expand Down
7 changes: 7 additions & 0 deletions server/src/main/resources/ehcache-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
>
<persistence strategy="none"/>
</cache>
<cache name="UserCache"
eternal="false"
timeToLiveSeconds="10800"
memoryStoreEvictionPolicy="LRU"
>
<persistence strategy="none"/>
</cache>
<cache name="ExceptionQueryCache"
eternal="false"
timeToIdleSeconds="86400"
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/resources/ehcache.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
>
<persistence strategy="none"/>
</cache>
<cache name="UserCache"
eternal="false"
timeToIdleSeconds="86400"
memoryStoreEvictionPolicy="LRU"
>
<persistence strategy="none"/>
</cache>
<cache name="ExceptionQueryCache"
eternal="false"
timeToIdleSeconds="86400"
Expand Down

0 comments on commit 2eb9678

Please sign in to comment.