Skip to content

Commit

Permalink
PHOENIX-4165 Do not wait no new memory chunk can be allocated.
Browse files Browse the repository at this point in the history
  • Loading branch information
lhofhansl committed Sep 9, 2017
1 parent 2ad5d4b commit 44c0034
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 215 deletions.
Expand Up @@ -19,7 +19,6 @@

import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;

import java.util.Map;
Expand Down Expand Up @@ -150,8 +149,7 @@ private static long getMaxMemorySize(Configuration config) {
}

private GlobalCache(Configuration config) {
super(new GlobalMemoryManager(getMaxMemorySize(config),
config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)),
super(new GlobalMemoryManager(getMaxMemorySize(config)),
config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
this.config = config;
}
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.phoenix.memory;

import com.google.common.annotations.VisibleForTesting;
import org.apache.http.annotation.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,18 +32,14 @@ public class GlobalMemoryManager implements MemoryManager {

private final Object sync = new Object();
private final long maxMemoryBytes;
private final int maxWaitMs;
@GuardedBy("sync")
private volatile long usedMemoryBytes;
public GlobalMemoryManager(long maxBytes, int maxWaitMs) {
public GlobalMemoryManager(long maxBytes) {
if (maxBytes <= 0) {
throw new IllegalStateException("Total number of available bytes (" + maxBytes + ") must be greater than zero");
}
if (maxWaitMs < 0) {
throw new IllegalStateException("Maximum wait time (" + maxWaitMs + ") must be greater than or equal to zero");
throw new IllegalStateException(
"Total number of available bytes (" + maxBytes + ") must be greater than zero");
}
this.maxMemoryBytes = maxBytes;
this.maxWaitMs = maxWaitMs;
this.usedMemoryBytes = 0;
}

Expand All @@ -61,45 +56,34 @@ public long getMaxMemory() {
}


// TODO: Work on fairness: One big memory request can cause all others to block here.
// TODO: Work on fairness: One big memory request can cause all others to fail here.
private long allocateBytes(long minBytes, long reqBytes) {
if (minBytes < 0 || reqBytes < 0) {
throw new IllegalStateException("Minimum requested bytes (" + minBytes + ") and requested bytes (" + reqBytes + ") must be greater than zero");
throw new IllegalStateException("Minimum requested bytes (" + minBytes
+ ") and requested bytes (" + reqBytes + ") must be greater than zero");
}
if (minBytes > maxMemoryBytes) { // No need to wait, since we'll never have this much available
throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes is larger than global pool of " + maxMemoryBytes + " bytes.");
if (minBytes > maxMemoryBytes) {
throw new InsufficientMemoryException("Requested memory of " + minBytes
+ " bytes is larger than global pool of " + maxMemoryBytes + " bytes.");
}
long startTimeMs = System.currentTimeMillis(); // Get time outside of sync block to account for waiting for lock
long nBytes;
synchronized(sync) {
while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes not available
waitForBytesToFree(minBytes, startTimeMs);
if (usedMemoryBytes + minBytes > maxMemoryBytes) {
throw new InsufficientMemoryException("Requested memory of " + minBytes
+ " bytes could not be allocated. Using memory of " + usedMemoryBytes
+ " bytes from global pool of " + maxMemoryBytes);
}
// Allocate at most reqBytes, but at least minBytes
nBytes = Math.min(reqBytes, maxMemoryBytes - usedMemoryBytes);
if (nBytes < minBytes) {
throw new IllegalStateException("Allocated bytes (" + nBytes + ") should be at least the minimum requested bytes (" + minBytes + ")");
throw new IllegalStateException("Allocated bytes (" + nBytes
+ ") should be at least the minimum requested bytes (" + minBytes + ")");
}
usedMemoryBytes += nBytes;
}
return nBytes;
}

@VisibleForTesting
void waitForBytesToFree(long minBytes, long startTimeMs) {
try {
logger.debug("Waiting for " + (usedMemoryBytes + minBytes - maxMemoryBytes) + " bytes to be free " + startTimeMs);
long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() - startTimeMs);
if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some memory to get freed up
throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes could not be allocated. Using memory of " + usedMemoryBytes + " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms.");
}
sync.wait(remainingWaitTimeMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted allocation of " + minBytes + " bytes", ie);
}
}

@Override
public MemoryChunk allocate(long minBytes, long reqBytes) {
long nBytes = allocateBytes(minBytes, reqBytes);
Expand Down Expand Up @@ -130,9 +114,7 @@ private GlobalMemoryChunk(long size) {

@Override
public long getSize() {
synchronized(sync) {
return size; // TODO: does this need to be synchronized?
}
return size;
}

@Override
Expand All @@ -145,7 +127,6 @@ public void resize(long nBytes) {
if (nAdditionalBytes < 0) {
usedMemoryBytes += nAdditionalBytes;
size = nBytes;
sync.notifyAll();
} else {
allocateBytes(nAdditionalBytes, nAdditionalBytes);
size = nBytes;
Expand Down Expand Up @@ -174,7 +155,6 @@ private void freeMemory() {
synchronized(sync) {
usedMemoryBytes -= size;
size = 0;
sync.notifyAll();
}
}

Expand Down
Expand Up @@ -47,8 +47,7 @@ public BaseQueryServicesImpl(ReadOnlyProps defaultProps, QueryServicesOptions op
options.getQueueSize(),
options.isGlobalMetricsEnabled());
this.memoryManager = new GlobalMemoryManager(
Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100,
options.getMaxMemoryWaitMs());
Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100);
this.props = options.getProps(defaultProps);
this.queryOptimizer = new QueryOptimizer(this);
}
Expand Down
Expand Up @@ -74,7 +74,6 @@ public interface QueryServices extends SQLCloseable {
public static final String SCAN_RESULT_CHUNK_SIZE = "phoenix.query.scanResultChunkSize";

public static final String MAX_MEMORY_PERC_ATTRIB = "phoenix.query.maxGlobalMemoryPercentage";
public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs";
public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage";
public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes";
public static final String DATE_FORMAT_TIMEZONE_ATTRIB = "phoenix.query.dateFormatTimeZone";
Expand Down
Expand Up @@ -48,7 +48,6 @@
import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB;
Expand Down Expand Up @@ -128,7 +127,6 @@ public class QueryServicesOptions {
public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
public static final String DEFAULT_SPOOL_DIRECTORY = System.getProperty("java.io.tmpdir");
public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
public static final int DEFAULT_MAX_MEMORY_WAIT_MS = 10000;
public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024*1024*100; // 100 Mb
public static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 32;
Expand Down Expand Up @@ -362,7 +360,6 @@ public static QueryServicesOptions withDefaults() {
.setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SPOOL_THRESHOLD_BYTES)
.setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
.setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
.setIfUnset(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS)
.setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
.setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE)
.setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE)
Expand Down Expand Up @@ -475,10 +472,6 @@ public QueryServicesOptions setMaxMemoryPerc(int maxMemoryPerc) {
return set(MAX_MEMORY_PERC_ATTRIB, maxMemoryPerc);
}

public QueryServicesOptions setMaxMemoryWaitMs(int maxMemoryWaitMs) {
return set(MAX_MEMORY_WAIT_MS_ATTRIB, maxMemoryWaitMs);
}

public QueryServicesOptions setMaxTenantMemoryPerc(int maxTenantMemoryPerc) {
return set(MAX_TENANT_MEMORY_PERC_ATTRIB, maxTenantMemoryPerc);
}
Expand Down Expand Up @@ -568,10 +561,6 @@ public int getMaxMemoryPerc() {
return config.getInt(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC);
}

public int getMaxMemoryWaitMs() {
return config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS);
}

public int getMaxMutateSize() {
return config.getInt(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE);
}
Expand Down
Expand Up @@ -42,8 +42,7 @@ public class TenantCacheTest {
public void testInvalidateClosesMemoryChunk() throws SQLException {
int maxServerCacheTimeToLive = 10000;
long maxBytes = 1000;
int maxWaitMs = 1000;
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs);
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive);
ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
Expand All @@ -57,8 +56,7 @@ public void testInvalidateClosesMemoryChunk() throws SQLException {
public void testTimeoutClosesMemoryChunk() throws Exception {
int maxServerCacheTimeToLive = 10;
long maxBytes = 1000;
int maxWaitMs = 10;
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs);
GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
ManualTicker ticker = new ManualTicker();
TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker);
ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
Expand Down
Expand Up @@ -53,7 +53,7 @@ private void testSpooling(int threshold, long maxSizeSpool) throws Throwable {
new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))),
};

MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold, 0));
MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold));
ResultIterator scanner = new SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
AssertResults.assertResults(scanner, expectedResults);
}
Expand Down

0 comments on commit 44c0034

Please sign in to comment.