Skip to content

Commit

Permalink
DRILL-3987: (REFACTOR) Common and Vector modules building.
Browse files Browse the repository at this point in the history
- Extract Accountor interface from Implementation
- Separate FMPP modules to separate out Vector Needs versus external needs
- Separate out Vector classes from those that are VectorAccessible.
- Cleanup Memory Exception hiearchy
  • Loading branch information
jacques-n committed Nov 13, 2015
1 parent 9969d8b commit 4524fdb
Show file tree
Hide file tree
Showing 97 changed files with 599 additions and 531 deletions.
2 changes: 1 addition & 1 deletion exec/java-exec/src/main/codegen/includes/vv_imports.ftl
Expand Up @@ -46,7 +46,7 @@ import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.drill.exec.util.JsonStringArrayList;

import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.exception.OutOfMemoryException;

import com.sun.codemodel.JType;
import com.sun.codemodel.JCodeModel;
Expand Down
95 changes: 95 additions & 0 deletions exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.drill.exec.vector.complex.UnionVector;

<@pp.dropOutputFile />
<@pp.changeOutputFile name="/org/apache/drill/exec/expr/TypeHelper.java" />

<#include "/@includes/license.ftl" />

package org.apache.drill.exec.expr;

<#include "/@includes/vv_imports.ftl" />
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.accessor.*;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.util.CallBack;

public class TypeHelper extends BasicTypeHelper{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);

public static SqlAccessor getSqlAccessor(ValueVector vector){
final MajorType type = vector.getField().getType();
switch(type.getMinorType()){
case UNION:
return new UnionSqlAccessor((UnionVector) vector);
<#list vv.types as type>
<#list type.minor as minor>
case ${minor.class?upper_case}:
switch (type.getMode()) {
case REQUIRED:
return new ${minor.class}Accessor((${minor.class}Vector) vector);
case OPTIONAL:
return new Nullable${minor.class}Accessor((Nullable${minor.class}Vector) vector);
case REPEATED:
return new GenericAccessor(vector);
}
</#list>
</#list>
case MAP:
case LIST:
return new GenericAccessor(vector);
}
throw new UnsupportedOperationException(buildErrorMessage("find sql accessor", type));
}

public static JType getHolderType(JCodeModel model, MinorType type, DataMode mode){
switch (type) {
case UNION:
return model._ref(UnionHolder.class);
case MAP:
case LIST:
return model._ref(ComplexHolder.class);

<#list vv.types as type>
<#list type.minor as minor>
case ${minor.class?upper_case}:
switch (mode) {
case REQUIRED:
return model._ref(${minor.class}Holder.class);
case OPTIONAL:
return model._ref(Nullable${minor.class}Holder.class);
case REPEATED:
return model._ref(Repeated${minor.class}Holder.class);
}
</#list>
</#list>
case GENERIC_OBJECT:
return model._ref(ObjectHolder.class);
default:
break;
}
throw new UnsupportedOperationException(buildErrorMessage("get holder type", type, mode));
}

}
Expand Up @@ -41,8 +41,8 @@
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
Expand Down
Expand Up @@ -28,25 +28,20 @@

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.util.AssertionUtil;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;

public class Accountor {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountor.class);
public class AccountorImpl implements Accountor {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AccountorImpl.class);

private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
private final AtomicRemainder remainder;
private final long total;
private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
private final FragmentHandle handle;
private String fragmentStr;
private Accountor parent;
private AccountorImpl parent;

private final boolean errorOnLeak;
// some operators are no subject to the fragment limit. They set the applyFragmentLimit to false
Expand All @@ -59,16 +54,17 @@ public class Accountor {

private final boolean applyFragmentLimit;

private final FragmentContext fragmentContext;
private final LimitConsumer limitConsumer;
long fragmentLimit;

private long peakMemoryAllocation = 0;

// The top level Allocator has an accountor that keeps track of all the FragmentContexts currently executing.
// The top level Allocator has an accountor that keeps track of all the LimitConsumers currently executing.
// This enables the top level accountor to calculate a new fragment limit whenever necessary.
private final List<FragmentContext> fragmentContexts;
private final List<LimitConsumer> limitConsumers;

public Accountor(DrillConfig config, boolean errorOnLeak, FragmentContext context, Accountor parent, long max, long preAllocated, boolean applyFragLimit) {
public AccountorImpl(DrillConfig config, boolean errorOnLeak, LimitConsumer context, AccountorImpl parent, long max,
long preAllocated, boolean applyFragLimit) {
// TODO: fix preallocation stuff
this.errorOnLeak = errorOnLeak;
AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
Expand All @@ -92,19 +88,17 @@ public Accountor(DrillConfig config, boolean errorOnLeak, FragmentContext contex

this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated, applyFragmentLimit);
this.total = max;
this.fragmentContext=context;
this.handle = (context!=null) ? context.getHandle() : null;
this.fragmentStr= (handle!=null) ? ( handle.getMajorFragmentId()+":"+handle.getMinorFragmentId() ) : "0:0";
this.limitConsumer = context;
this.fragmentLimit=this.total; // Allow as much as possible to start with;
if (ENABLE_ACCOUNTING) {
buffers = Maps.newConcurrentMap();
} else {
buffers = null;
}
this.fragmentContexts = new ArrayList<FragmentContext>();
this.limitConsumers = new ArrayList<LimitConsumer>();
if(parent!=null && parent.parent==null){ // Only add the fragment context to the fragment level accountor
synchronized(this) {
addFragmentContext(this.fragmentContext);
addLimitConsumer(this.limitConsumer);
}
}
}
Expand All @@ -124,7 +118,10 @@ private boolean transfer(Accountor target, DrillBuf buf, long size, boolean rele
}

if (ENABLE_ACCOUNTING) {
target.buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace()));
if (target instanceof AccountorImpl) {
((AccountorImpl) target).buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread()
.getStackTrace()));
}
}
return withinLimit;
}
Expand All @@ -149,7 +146,6 @@ public long getPeakMemoryAllocation() {
}

public boolean reserve(long size) {
logger.trace("Fragment:"+fragmentStr+" Reserved "+size+" bytes. Total Allocated: "+getAllocation());
boolean status = remainder.get(size, this.applyFragmentLimit);
peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
return status;
Expand Down Expand Up @@ -210,19 +206,12 @@ public void release(DrillBuf buf, long size) {
}
}

private void addFragmentContext(FragmentContext c) {
private void addLimitConsumer(LimitConsumer c) {
if (parent != null){
parent.addFragmentContext(c);
parent.addLimitConsumer(c);
}else {
if(logger.isTraceEnabled()) {
FragmentHandle hndle;
String fragStr;
if(c!=null) {
hndle = c.getHandle();
fragStr = (hndle != null) ? (hndle.getMajorFragmentId() + ":" + hndle.getMinorFragmentId()) : "[Null Fragment Handle]";
}else{
fragStr = "[Null Context]";
}
String fragStr = c == null ? "[Null Context]" : c.getIdentifier();
fragStr+=" (Object Id: "+System.identityHashCode(c)+")";
StackTraceElement[] ste = (new Throwable()).getStackTrace();
StringBuffer sb = new StringBuffer();
Expand All @@ -234,32 +223,25 @@ private void addFragmentContext(FragmentContext c) {
logger.trace("Fragment " + fragStr + " added to root accountor.\n"+sb.toString());
}
synchronized(this) {
fragmentContexts.add(c);
limitConsumers.add(c);
}
}
}

private void removeFragmentContext(FragmentContext c) {
private void removeLimitConsumer(LimitConsumer c) {
if (parent != null){
if (parent.parent==null){
// only fragment level allocators will have the fragment context saved
parent.removeFragmentContext(c);
parent.removeLimitConsumer(c);
}
}else{
if(logger.isDebugEnabled()) {
FragmentHandle hndle;
String fragStr;
if (c != null) {
hndle = c.getHandle();
fragStr = (hndle != null) ? (hndle.getMajorFragmentId() + ":" + hndle.getMinorFragmentId()) : "[Null Fragment Handle]";
} else {
fragStr = "[Null Context]";
}
String fragStr = c == null ? "[Null Context]" : c.getIdentifier();
fragStr += " (Object Id: " + System.identityHashCode(c) + ")";
logger.trace("Fragment " + fragStr + " removed from root accountor");
}
synchronized(this) {
fragmentContexts.remove(c);
limitConsumers.remove(c);
}
}
}
Expand All @@ -279,13 +261,10 @@ public long resetFragmentLimits(){
//If the already running fragments end quickly, their limits will be assigned back to the remaining fragments
//quickly. If they are long running, then we want to favour them with larger limits anyway.
synchronized (this) {
int nFragments=fragmentContexts.size();
int nFragments = limitConsumers.size();
long allocatedMemory=0;
for(FragmentContext fragment: fragmentContexts){
BufferAllocator a = fragment.getAllocator();
if(a!=null) {
allocatedMemory += fragment.getAllocator().getAllocatedMemory();
}
for (LimitConsumer fragment : limitConsumers) {
allocatedMemory += fragment.getAllocated();
}
if(logger.isTraceEnabled()) {
logger.trace("Resetting Fragment Memory Limit: total Available memory== "+total
Expand All @@ -297,8 +276,8 @@ public long resetFragmentLimits(){
}
if(nFragments>0) {
long rem = (total - allocatedMemory) / nFragments;
for (FragmentContext fragment : fragmentContexts) {
fragment.setFragmentLimit((long) (rem * fragmentMemOvercommitFactor));
for (LimitConsumer fragment : limitConsumers) {
fragment.setLimit((long) (rem * fragmentMemOvercommitFactor));
}
}
if(logger.isTraceEnabled() && false){
Expand All @@ -309,23 +288,15 @@ public long resetFragmentLimits(){
sb.append(" Fragment Limit: ");
sb.append(this.getFragmentLimit());
logger.trace(sb.toString());
for(FragmentContext fragment: fragmentContexts){
for (LimitConsumer fragment : limitConsumers) {
sb= new StringBuffer();
if (handle != null) {
sb.append("[");
sb.append(QueryIdHelper.getQueryId(handle.getQueryId()));
sb.append("](");
sb.append(handle.getMajorFragmentId());
sb.append(":");
sb.append(handle.getMinorFragmentId());
sb.append(")");
}else{
sb.append("[fragment](0:0)");
}
sb.append('[');
sb.append(fragment.getIdentifier());
sb.append(']');
sb.append("Allocated memory: ");
sb.append(fragment.getAllocator().getAllocatedMemory());
sb.append(fragment.getAllocated());
sb.append(" Fragment Limit: ");
sb.append(fragment.getAllocator().getFragmentLimit());
sb.append(fragment.getLimit());
logger.trace(sb.toString());
}
logger.trace("Resetting Complete");
Expand All @@ -338,24 +309,18 @@ public long resetFragmentLimits(){
public void close() {
// remove the fragment context and reset fragment limits whenever an allocator closes
if(parent!=null && parent.parent==null) {
logger.debug("Fragment " + fragmentStr + " accountor being closed");
removeFragmentContext(fragmentContext);

logger.debug("Fragment " + limitConsumer.getIdentifier() + " accountor being closed");
removeLimitConsumer(limitConsumer);
}
resetFragmentLimits();

if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
StringBuffer sb = new StringBuffer();
sb.append("Attempted to close accountor with ");
sb.append(buffers.size());
sb.append(" buffer(s) still allocated");
if (handle != null) {
sb.append("for QueryId: ");
sb.append(QueryIdHelper.getQueryId(handle.getQueryId()));
sb.append(", MajorFragmentId: ");
sb.append(handle.getMajorFragmentId());
sb.append(", MinorFragmentId: ");
sb.append(handle.getMinorFragmentId());
}
sb.append(" buffer(s) still allocated for ");
sb.append(limitConsumer.getIdentifier());
sb.append(".\n");

Multimap<DebugStackTrace, DebugStackTrace> multi = LinkedListMultimap.create();
Expand Down Expand Up @@ -400,7 +365,7 @@ public void setFragmentLimit(long add) {
if (parent != null && parent.parent==null) { // This is a fragment level accountor
this.fragmentLimit=getAllocation()+add;
this.remainder.setLimit(this.fragmentLimit);
logger.trace("Fragment "+fragmentStr+" memory limit set to "+this.fragmentLimit);
logger.trace("Fragment " + limitConsumer.getIdentifier() + " memory limit set to " + this.fragmentLimit);
}
}

Expand Down

0 comments on commit 4524fdb

Please sign in to comment.