Skip to content

Commit

Permalink
DRILL-4134: Allocator Improvements
Browse files Browse the repository at this point in the history
- make Allocator mostly lockless
- change BaseAllocator maps to direct references
- add documentation around memory management model
- move transfer and ownership methods to DrillBuf
- Improve debug messaging.
- Fix/revert sort changes
- Remove unused fragment limit flag
- Add time to HistoricalLog events
- Remove reservation amount from RootAllocator constructor (since not allowed)
- Fix concurrency issue where allocator is closing at same moment as incoming batch transfer, causing leaked memory and/or query failure.
- Add new AutoCloseables.close(Iterable<AutoCloseable>)
- Remove extraneous DataResponseHandler and Impl (and update TestBitRpc to use smarter mock of FragmentManager)
- Remove the concept of poison pill record batches, using instead FragmentContext.isOverMemoryLimit()
- Update incoming data batches so that they are transferred under protection of a close lock
- Improve field names in IncomingBuffers and move synchronization to collectors as opposed to IncomingBuffers (also change decrementing to decrementToZero rather than two part check).

This closes #238.
  • Loading branch information
jacques-n committed Dec 22, 2015
1 parent 53dcabe commit 809f462
Show file tree
Hide file tree
Showing 110 changed files with 3,587 additions and 5,854 deletions.
Expand Up @@ -68,7 +68,7 @@ public static void close(AutoCloseable... autoCloseables) throws Exception {
* Closes all autoCloseables if not null and suppresses subsequent exceptions if more than one
* @param autoCloseables the closeables to close
*/
public static void close(Collection<? extends AutoCloseable> ac) throws Exception {
public static void close(Iterable<? extends AutoCloseable> ac) throws Exception {
Exception topLevelException = null;
for (AutoCloseable closeable : ac) {
try {
Expand Down
68 changes: 37 additions & 31 deletions common/src/main/java/org/apache/drill/common/HistoricalLog.java
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.common;

import java.util.Arrays;
import java.util.LinkedList;

import org.slf4j.Logger;
Expand All @@ -30,9 +31,11 @@ public class HistoricalLog {
private static class Event {
private final String note; // the event text
private final StackTrace stackTrace; // where the event occurred
private final long time;

public Event(final String note) {
this.note = note;
this.time = System.nanoTime();
stackTrace = new StackTrace();
}
}
Expand Down Expand Up @@ -106,8 +109,8 @@ public synchronized void recordEvent(final String noteFormat, Object... args) {
*
* @param sb {@link StringBuilder} to write to
*/
public void buildHistory(final StringBuilder sb) {
buildHistory(sb, null);
public void buildHistory(final StringBuilder sb, boolean includeStackTrace) {
buildHistory(sb, 0, includeStackTrace);
}

/**
Expand All @@ -119,34 +122,50 @@ public void buildHistory(final StringBuilder sb) {
* @param additional an extra string that will be written between the identifying
* information and the history; often used for a current piece of state
*/
public synchronized void buildHistory(final StringBuilder sb, final CharSequence additional) {
sb.append('\n')
.append(idString);

if (additional != null) {
sb.append('\n')
.append(additional)
.append('\n');
}
/**
*
* @param sb
* @param indexLevel
* @param includeStackTrace
*/
public synchronized void buildHistory(final StringBuilder sb, int indent, boolean includeStackTrace) {
final char[] indentation = new char[indent];
final char[] innerIndentation = new char[indent + 2];
Arrays.fill(indentation, ' ');
Arrays.fill(innerIndentation, ' ');

sb.append(indentation)
.append("event log for: ")
.append(idString)
.append('\n');

sb.append(" event log\n");

if (firstEvent != null) {
sb.append(" ")
sb.append(innerIndentation)
.append(firstEvent.time)
.append(' ')
.append(firstEvent.note)
.append('\n');
firstEvent.stackTrace.writeToBuilder(sb, 4);
if (includeStackTrace) {
firstEvent.stackTrace.writeToBuilder(sb, indent + 2);
}

for(final Event event : history) {
if (event == firstEvent) {
continue;
}

sb.append(" ")
sb.append(innerIndentation)
.append(" ")
.append(event.time)
.append(' ')
.append(event.note)
.append('\n');

event.stackTrace.writeToBuilder(sb, 4);
if (includeStackTrace) {
event.stackTrace.writeToBuilder(sb, indent + 2);
sb.append('\n');
}
}
}
}
Expand All @@ -157,23 +176,10 @@ public synchronized void buildHistory(final StringBuilder sb, final CharSequence
* events with their stack traces.
*
* @param logger {@link Logger} to write to
* @param additional an extra string that will be written between the identifying
* information and the history; often used for a current piece of state
*/
public void logHistory(final Logger logger, final CharSequence additional) {
public void logHistory(final Logger logger) {
final StringBuilder sb = new StringBuilder();
buildHistory(sb, additional);
buildHistory(sb, 0, true);
logger.debug(sb.toString());
}

/**
* Write the history of this object to the given {@link Logger}. The history
* includes the identifying string provided at construction time, and all the recorded
* events with their stack traces.
*
* @param logger {@link Logger} to write to
*/
public void logHistory(final Logger logger) {
logHistory(logger, null);
}
}
9 changes: 5 additions & 4 deletions common/src/main/java/org/apache/drill/common/StackTrace.java
Expand Up @@ -36,13 +36,14 @@ public StackTrace() {

/**
* Write the stack trace to a StringBuilder.
*
* @param sb where to write it
* @param indent how many spaces to indent each line
* @param sb
* where to write it
* @param indent
* how many double spaces to indent each line
*/
public void writeToBuilder(final StringBuilder sb, final int indent) {
// create the indentation string
final char[] indentation = new char[indent];
final char[] indentation = new char[indent * 2];
Arrays.fill(indentation, ' ');

// write the stack trace in standard Java format
Expand Down
Expand Up @@ -15,20 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc.data;
package org.apache.drill.common.concurrent;

import io.netty.buffer.DrillBuf;
import java.util.concurrent.locks.Lock;

import java.io.IOException;
/**
* Simple wrapper class that allows Locks to be released via an try-with-resources block.
*/
public class AutoCloseableLock implements AutoCloseable {

private final Lock lock;

import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.work.fragment.FragmentManager;
public AutoCloseableLock(Lock lock) {
this.lock = lock;
}

public interface DataResponseHandler {
public AutoCloseableLock open() {
lock.lock();
return this;
}

public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch,
DrillBuf data, AckSender sender) throws FragmentSetupException, IOException;
@Override
public void close() {
lock.unlock();
}

public void informOutOfMemory();
}
Expand Up @@ -24,8 +24,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
Expand Down Expand Up @@ -55,6 +53,9 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

@SuppressWarnings("unused")
public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {

Expand Down Expand Up @@ -94,8 +95,7 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
}
}

final OperatorContext oContext = context.newOperatorContext(config,
false /* ScanBatch is not subject to fragment memory limit */);
final OperatorContext oContext = context.newOperatorContext(config);

int currentPartitionIndex = 0;
final List<RecordReader> readers = Lists.newArrayList();
Expand Down
5 changes: 0 additions & 5 deletions distribution/pom.xml
Expand Up @@ -44,11 +44,6 @@
<artifactId>drill-memory-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.drill.memory</groupId>
<artifactId>drill-memory-impl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-rpc</artifactId>
Expand Down
1 change: 0 additions & 1 deletion distribution/src/assemble/bin.xml
Expand Up @@ -89,7 +89,6 @@
<include>org.apache.drill:drill-logical:jar</include>
<include>org.apache.drill.exec:vector:jar</include>
<include>org.apache.drill.memory:drill-memory-base:jar</include>
<include>org.apache.drill.memory:drill-memory-impl:jar</include>
<include>org.apache.drill.exec:drill-rpc:jar</include>
<include>org.apache.drill.exec:drill-java-exec:jar</include>
<include>org.apache.drill.contrib.storage-hive:drill-storage-hive-core</include>
Expand Down
2 changes: 1 addition & 1 deletion exec/java-exec/pom.xml
Expand Up @@ -289,7 +289,7 @@
</dependency>
<dependency>
<groupId>org.apache.drill.memory</groupId>
<artifactId>drill-memory-impl</artifactId>
<artifactId>drill-memory-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
Expand Up @@ -30,8 +30,6 @@
public class StringFunctionHelpers {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringFunctionHelpers.class);

private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;

static final int RADIX = 10;
static final long MAX_LONG = -Long.MAX_VALUE / RADIX;
static final int MAX_INT = -Integer.MAX_VALUE / RADIX;
Expand Down Expand Up @@ -212,7 +210,7 @@ public static String toStringFromUTF16(int start, int end, DrillBuf buffer) {
private static final ISOChronology CHRONOLOGY = org.joda.time.chrono.ISOChronology.getInstanceUTC();

public static long getDate(DrillBuf buf, int start, int end){
if(BOUNDS_CHECKING_ENABLED){
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
buf.checkBytes(start, end);
}
return memGetDate(buf.memoryAddress(), start, end);
Expand Down
Expand Up @@ -27,8 +27,6 @@
public final class XXHash {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(XXHash.class);

private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;

static final long PRIME64_1 = UnsignedLongs.decode("11400714785074694791");
static final long PRIME64_2 = UnsignedLongs.decode("14029467366897019727");
static final long PRIME64_3 = UnsignedLongs.decode("1609587929392839161");
Expand Down Expand Up @@ -168,7 +166,7 @@ public static long hash64(double val, long seed){
}

public static long hash64(int start, int end, DrillBuf buffer, long seed){
if(BOUNDS_CHECKING_ENABLED){
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
buffer.checkBytes(start, end);
}

Expand Down
Expand Up @@ -20,24 +20,21 @@
import org.apache.drill.common.config.DrillConfig;

public class RootAllocatorFactory {

public static final String TOP_LEVEL_MAX_ALLOC = "drill.memory.top.max";

/**
* Constructor to prevent instantiation of this static utility class.
*/
private RootAllocatorFactory() {}

/**
* Factory method.
*
* @param drillConfig the DrillConfig
* Create a new Root Allocator
* @param drillConfig
* the DrillConfig
* @return a new root allocator
*/
public static BufferAllocator newRoot(final DrillConfig drillConfig) {
/* TODO(cwestin)
if (BaseAllocator.DEBUG) {
return new RootAllocator(drillConfig);
}
*/
return new RootAllocator(drillConfig);
// TODO(cwestin) return new TopLevelAllocator(drillConfig);
return new RootAllocator(Math.min(DrillConfig.getMaxDirectMemory(), drillConfig.getLong(TOP_LEVEL_MAX_ALLOC)));
}
}
Expand Up @@ -55,9 +55,8 @@ public DrillBuf getManagedBuffer() {
}

public DrillBuf getManagedBuffer(int size) {
DrillBuf newBuf = allocator.buffer(size);
DrillBuf newBuf = allocator.buffer(size, this);
managedBuffers.put(newBuf.memoryAddress(), newBuf);
newBuf.setBufferManager(this);
return newBuf;
}
}

0 comments on commit 809f462

Please sign in to comment.