Skip to content

Commit

Permalink
PHOENIX-2557 Track unfree memory for server-side cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jtaylor-sfdc committed Jan 3, 2016
1 parent cdc9cae commit 205c7b8
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 71 deletions.
Expand Up @@ -81,7 +81,9 @@ public void close() throws SQLException {
Set<PhoenixConnection> connections = this.connections; Set<PhoenixConnection> connections = this.connections;
this.connections = Sets.newHashSet(); this.connections = Sets.newHashSet();
SQLCloseables.closeAll(connections); SQLCloseables.closeAll(connections);
clearCache(); long unfreedBytes = clearCache();
// FIXME: once PHOENIX-2556 is fixed, comment this back in
// assertEquals(0,unfreedBytes);
} finally { } finally {
super.close(); super.close();
} }
Expand Down
Expand Up @@ -22,20 +22,24 @@
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.ChildMemoryManager; import org.apache.phoenix.memory.ChildMemoryManager;
import org.apache.phoenix.memory.GlobalMemoryManager; import org.apache.phoenix.memory.GlobalMemoryManager;
import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.SizedUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
Expand All @@ -51,6 +55,7 @@
* @since 0.1 * @since 0.1
*/ */
public class GlobalCache extends TenantCacheImpl { public class GlobalCache extends TenantCacheImpl {
private static final Logger logger = LoggerFactory.getLogger(GlobalCache.class);
private static volatile GlobalCache INSTANCE; private static volatile GlobalCache INSTANCE;


private final Configuration config; private final Configuration config;
Expand All @@ -59,8 +64,24 @@ public class GlobalCache extends TenantCacheImpl {
// Cache for lastest PTable for a given Phoenix table // Cache for lastest PTable for a given Phoenix table
private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache;


public void clearTenantCache() { public long clearTenantCache() {
long unfreedBytes = getMemoryManager().getMaxMemory() - getMemoryManager().getAvailableMemory();
if (unfreedBytes != 0 && logger.isDebugEnabled()) {
logger.debug("Found " + (getMemoryManager().getMaxMemory() - getMemoryManager().getAvailableMemory()) + " bytes not freed from global cache");
}
removeAllServerCache();
for (Map.Entry<ImmutableBytesWritable, TenantCache> entry : perTenantCacheMap.entrySet()) {
TenantCache cache = entry.getValue();
long unfreedTenantBytes = cache.getMemoryManager().getMaxMemory() - cache.getMemoryManager().getAvailableMemory();
if (unfreedTenantBytes != 0 && logger.isDebugEnabled()) {
ImmutableBytesWritable cacheId = entry.getKey();
logger.debug("Found " + unfreedTenantBytes + " bytes not freed for tenant " + Bytes.toStringBinary(cacheId.get(), cacheId.getOffset(), cacheId.getLength()));
}
unfreedBytes += unfreedTenantBytes;
cache.removeAllServerCache();
}
perTenantCacheMap.clear(); perTenantCacheMap.clear();
return unfreedBytes;
} }


public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() { public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() {
Expand Down
Expand Up @@ -21,7 +21,6 @@
import java.sql.SQLException; import java.sql.SQLException;


import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager;
Expand All @@ -38,5 +37,6 @@ public interface TenantCache {
MemoryManager getMemoryManager(); MemoryManager getMemoryManager();
Closeable getServerCache(ImmutableBytesPtr cacheId); Closeable getServerCache(ImmutableBytesPtr cacheId);
Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException; void removeServerCache(ImmutableBytesPtr cacheId);
void removeAllServerCache();
} }
Expand Up @@ -29,6 +29,7 @@
import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.Closeables;


import com.google.common.base.Ticker;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalListener;
Expand All @@ -45,11 +46,30 @@
public class TenantCacheImpl implements TenantCache { public class TenantCacheImpl implements TenantCache {
private final int maxTimeToLiveMs; private final int maxTimeToLiveMs;
private final MemoryManager memoryManager; private final MemoryManager memoryManager;
private final Ticker ticker;
private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches; private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;


public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) { public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
this(memoryManager, maxTimeToLiveMs, Ticker.systemTicker());
}

public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, Ticker ticker) {
this.memoryManager = memoryManager; this.memoryManager = memoryManager;
this.maxTimeToLiveMs = maxTimeToLiveMs; this.maxTimeToLiveMs = maxTimeToLiveMs;
this.ticker = ticker;
}

public Ticker getTicker() {
return ticker;
}

// For testing
public void cleanUp() {
synchronized(this) {
if (serverCaches != null) {
serverCaches.cleanUp();
}
}
} }


@Override @Override
Expand All @@ -64,6 +84,7 @@ private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
if (serverCaches == null) { if (serverCaches == null) {
serverCaches = CacheBuilder.newBuilder() serverCaches = CacheBuilder.newBuilder()
.expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS) .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
.ticker(getTicker())
.removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){ .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
@Override @Override
public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) { public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
Expand Down Expand Up @@ -99,7 +120,12 @@ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritabl
} }


@Override @Override
public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException { public void removeServerCache(ImmutableBytesPtr cacheId) {
getServerCaches().invalidate(cacheId); getServerCaches().invalidate(cacheId);
} }

@Override
public void removeAllServerCache() {
getServerCaches().invalidateAll();
}
} }
Expand Up @@ -2668,7 +2668,10 @@ public void clearCache(RpcController controller, ClearCacheRequest request,
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache(); GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidateAll(); metaDataCache.invalidateAll();
cache.clearTenantCache(); long unfreedBytes = cache.clearTenantCache();
ClearCacheResponse.Builder builder = ClearCacheResponse.newBuilder();
builder.setUnfreedBytes(unfreedBytes);
done.run(builder.build());
} }


@Override @Override
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.phoenix.coprocessor; package org.apache.phoenix.coprocessor;


import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;


import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
Expand Down Expand Up @@ -91,11 +90,7 @@ public void removeServerCache(RpcController controller, RemoveServerCacheRequest
tenantId = new ImmutableBytesPtr(request.getTenantId().toByteArray()); tenantId = new ImmutableBytesPtr(request.getTenantId().toByteArray());
} }
TenantCache tenantCache = GlobalCache.getTenantCache(this.env, tenantId); TenantCache tenantCache = GlobalCache.getTenantCache(this.env, tenantId);
try { tenantCache.removeServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()));
tenantCache.removeServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()));
} catch (SQLException e) {
ProtobufUtil.setControllerException(controller, new IOException(e));
}
RemoveServerCacheResponse.Builder responseBuilder = RemoveServerCacheResponse.newBuilder(); RemoveServerCacheResponse.Builder responseBuilder = RemoveServerCacheResponse.newBuilder();
responseBuilder.setReturn(true); responseBuilder.setReturn(true);
RemoveServerCacheResponse result = responseBuilder.build(); RemoveServerCacheResponse result = responseBuilder.build();
Expand Down

0 comments on commit 205c7b8

Please sign in to comment.