From baca735260a8f163df516f5174774461cf303661 Mon Sep 17 00:00:00 2001 From: Chris Westin Date: Tue, 25 Aug 2015 16:45:35 -0700 Subject: [PATCH] DRILL-1942-hygiene: - add AutoCloseable to many classes - minor fixes - formatting --- .../cache/AbstractStreamSerializable.java | 6 +- .../LoopedAbstractDrillSerializable.java | 18 +- .../cache/VectorAccessibleSerializable.java | 31 +- .../exec/expr/fn/impl/StringFunctions.java | 310 ++++++++++-------- .../apache/drill/exec/ops/BufferManager.java | 13 +- .../drill/exec/physical/impl/ImplCreator.java | 8 +- .../exec/physical/impl/ScreenCreator.java | 24 +- .../physical/impl/SingleSenderCreator.java | 23 +- .../impl/TopN/PriorityQueueTemplate.java | 13 +- .../exec/physical/impl/WriterRecordBatch.java | 37 ++- .../impl/aggregate/StreamingAggBatch.java | 52 ++- .../BroadcastSenderRootExec.java | 7 +- .../impl/filter/FilterRecordBatch.java | 32 +- .../physical/impl/join/HashJoinBatch.java | 96 +++--- .../impl/join/NestedLoopJoinBatch.java | 14 +- .../physical/impl/limit/LimitRecordBatch.java | 42 ++- .../OrderedPartitionRecordBatch.java | 4 +- .../PartitionSenderRootExec.java | 14 +- .../impl/producer/ProducerConsumerBatch.java | 5 +- .../exec/physical/impl/sort/SortBatch.java | 4 +- .../impl/svremover/RemovingRecordBatch.java | 2 +- .../impl/window/WindowFrameRecordBatch.java | 8 +- .../impl/xsort/ExternalSortBatch.java | 6 +- .../physical/impl/xsort/MSortTemplate.java | 12 +- .../impl/xsort/PriorityQueueCopier.java | 17 +- .../xsort/PriorityQueueCopierTemplate.java | 8 +- .../exec/record/AbstractRecordBatch.java | 12 +- .../record/AbstractSingleRecordBatch.java | 8 +- .../drill/exec/record/RawFragmentBatch.java | 14 +- .../drill/exec/record/RecordBatchLoader.java | 25 +- .../exec/store/AbstractRecordReader.java | 2 +- 31 files changed, 425 insertions(+), 442 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java index ef488d6e113..1888e3bb1e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java @@ -21,14 +21,12 @@ import java.io.InputStream; import java.io.OutputStream; -public abstract class AbstractStreamSerializable extends LoopedAbstractDrillSerializable{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStreamSerializable.class); +public abstract class AbstractStreamSerializable extends LoopedAbstractDrillSerializable { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStreamSerializable.class); @Override public abstract void readFromStream(InputStream input) throws IOException; @Override public abstract void writeToStream(OutputStream output) throws IOException; - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java index d2a7458163a..82004364d26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java @@ -33,23 +33,23 @@ import org.apache.drill.common.util.DataOutputOutputStream; /** - * Helper class that holds the basic functionality to interchangably use the different Drill serializble interfaces. - * This is package private as users should utilize either AbstractDataSerializable or AbstractStreamSerializable instead - * to avoid infinite loops. + * Helper class that holds the basic functionality to interchangeably use + * the different Drill serializble interfaces. This is package private as + * users should utilize either AbstractDataSerializable or AbstractStreamSerializable + * instead to avoid infinite loops. */ abstract class LoopedAbstractDrillSerializable implements DrillSerializable { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoopedAbstractDrillSerializable.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoopedAbstractDrillSerializable.class); @Override public void writeExternal(ObjectOutput out) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); writeToStream(baos); - byte[] ba = baos.toByteArray(); + final byte[] ba = baos.toByteArray(); out.writeInt(ba.length); out.write(ba); } - @Override public void read(DataInput input) throws IOException { readFromStream(DataInputInputStream.constructInputStream(input)); @@ -72,8 +72,8 @@ public void writeToStream(OutputStream output) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int len = in.readInt(); - byte[] bytes = new byte[len]; + final int len = in.readInt(); + final byte[] bytes = new byte[len]; in.readFully(bytes); readFromStream(new ByteArrayInputStream(bytes)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index a5d2ce8929b..71c904d9a5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -52,7 +52,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { private VectorContainer va; private WritableBatch batch; - private BufferAllocator allocator; + private final BufferAllocator allocator; private int recordCount = -1; private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE; private SelectionVector2 sv2; @@ -61,7 +61,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { public VectorAccessibleSerializable(BufferAllocator allocator) { this.allocator = allocator; - this.va = new VectorContainer(); + va = new VectorContainer(); } public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocator) { @@ -77,16 +77,13 @@ public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocat */ public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, BufferAllocator allocator) { this.allocator = allocator; - if (batch != null) { - this.batch = batch; - } + this.batch = batch; if (sv2 != null) { this.sv2 = sv2; - this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE; + svMode = BatchSchema.SelectionVectorMode.TWO_BYTE; } } - /** * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits * and construct the vectors and add them to a vector container @@ -95,8 +92,8 @@ public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, B */ @Override public void readFromStream(InputStream input) throws IOException { - VectorContainer container = new VectorContainer(); - UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input); + final VectorContainer container = new VectorContainer(); + final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input); recordCount = batchDef.getRecordCount(); if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) { @@ -107,12 +104,12 @@ public void readFromStream(InputStream input) throws IOException { sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE); svMode = BatchSchema.SelectionVectorMode.TWO_BYTE; } - List vectorList = Lists.newArrayList(); - List fieldList = batchDef.getFieldList(); + final List vectorList = Lists.newArrayList(); + final List fieldList = batchDef.getFieldList(); for (SerializedField metaData : fieldList) { - int dataLength = metaData.getBufferLength(); - MaterializedField field = MaterializedField.create(metaData); - DrillBuf buf = allocator.buffer(dataLength); + final int dataLength = metaData.getBufferLength(); + final MaterializedField field = MaterializedField.create(metaData); + final DrillBuf buf = allocator.buffer(dataLength); final ValueVector vector; try { buf.writeBytes(input, dataLength); @@ -129,7 +126,6 @@ public void readFromStream(InputStream input) throws IOException { va = container; } - public void writeToStreamAndRetain(OutputStream output) throws IOException { retain = true; writeToStream(output); @@ -145,8 +141,8 @@ public void writeToStream(OutputStream output) throws IOException { Preconditions.checkNotNull(output); final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time(); - DrillBuf[] incomingBuffers = batch.getBuffers(); - UserBitShared.RecordBatchDef batchDef = batch.getDef(); + final DrillBuf[] incomingBuffers = batch.getBuffers(); + final UserBitShared.RecordBatchDef batchDef = batch.getDef(); /* DrillBuf associated with the selection vector */ DrillBuf svBuf = null; @@ -202,5 +198,4 @@ public VectorContainer get() { public SelectionVector2 getSv2() { return sv2; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java index 49f581fcd09..f5aeaf68b2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.expr.holders.NullableIntHolder; import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; @@ -50,18 +49,20 @@ private StringFunctions() {} */ @FunctionTemplate(name = "like", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Like implements DrillSimpleFunc{ + public static class Like implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output BitHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( // org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer))).matcher(""); } + @Override public void eval() { String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); @@ -70,7 +71,7 @@ public void eval() { } @FunctionTemplate(name = "like", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class LikeWithEscape implements DrillSimpleFunc{ + public static class LikeWithEscape implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @@ -78,12 +79,14 @@ public static class LikeWithEscape implements DrillSimpleFunc{ @Output BitHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( // org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer), org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(escape.start, escape.end, escape.buffer))).matcher(""); } + @Override public void eval() { String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); @@ -92,19 +95,21 @@ public void eval() { } @FunctionTemplate(name = "ilike", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class ILike implements DrillSimpleFunc{ + public static class ILike implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output BitHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( // org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer)), java.util.regex.Pattern.CASE_INSENSITIVE).matcher(""); } + @Override public void eval() { String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); @@ -113,7 +118,7 @@ public void eval() { } @FunctionTemplate(name = "ilike", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class ILikeWithEscape implements DrillSimpleFunc{ + public static class ILikeWithEscape implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @@ -121,6 +126,7 @@ public static class ILikeWithEscape implements DrillSimpleFunc{ @Output BitHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( // org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer), @@ -128,6 +134,7 @@ public void setup() { java.util.regex.Pattern.CASE_INSENSITIVE).matcher(""); } + @Override public void eval() { String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); @@ -136,17 +143,18 @@ public void eval() { } @FunctionTemplate(names = {"similar", "similar_to"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Similar implements DrillSimpleFunc{ + public static class Similar implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output BitHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { - matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexSimilar(org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer))).matcher(""); } + @Override public void eval() { String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); @@ -155,20 +163,21 @@ public void eval() { } @FunctionTemplate(names = {"similar", "similar_to"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class SimilarWithEscape implements DrillSimpleFunc{ + public static class SimilarWithEscape implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Param(constant=true) VarCharHolder escape; @Output BitHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { - matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexSimilar( org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer), org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(escape.start, escape.end, escape.buffer))).matcher(""); } + @Override public void eval() { String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); @@ -180,7 +189,7 @@ public void eval() { * Replace all substring that match the regular expression with replacement. */ @FunctionTemplate(name = "regexp_replace", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class RegexpReplace implements DrillSimpleFunc{ + public static class RegexpReplace implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @@ -189,16 +198,17 @@ public static class RegexpReplace implements DrillSimpleFunc{ @Workspace java.util.regex.Matcher matcher; @Output VarCharHolder out; + @Override public void setup() { matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer)).matcher(""); } + @Override public void eval() { - out.start = 0; - String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); - String r = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(replacement.start, replacement.end, replacement.buffer); - byte [] bytea = matcher.reset(i).replaceAll(r).getBytes(java.nio.charset.Charset.forName("UTF-8")); + final String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); + final String r = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(replacement.start, replacement.end, replacement.buffer); + final byte [] bytea = matcher.reset(i).replaceAll(r).getBytes(java.nio.charset.Charset.forName("UTF-8")); out.buffer = buffer = buffer.reallocIfNeeded(bytea.length); out.buffer.setBytes(out.start, bytea); out.end = bytea.length; @@ -206,52 +216,56 @@ public void eval() { } @FunctionTemplate(names = {"char_length", "character_length", "length"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class CharLength implements DrillSimpleFunc{ - + public static class CharLength implements DrillSimpleFunc { @Param VarCharHolder input; @Output BigIntHolder out; + @Override public void setup() {} + @Override public void eval() { out.value = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(input.buffer, input.start, input.end); } } @FunctionTemplate(name = "lengthUtf8", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class ByteLength implements DrillSimpleFunc{ - + public static class ByteLength implements DrillSimpleFunc { @Param VarBinaryHolder input; @Output BigIntHolder out; + @Override public void setup() {} + @Override public void eval() { out.value = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(input.buffer, input.start, input.end); } } @FunctionTemplate(name = "octet_length", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class OctetLength implements DrillSimpleFunc{ - + public static class OctetLength implements DrillSimpleFunc { @Param VarCharHolder input; @Output BigIntHolder out; + @Override public void setup() {} + @Override public void eval() { out.value = input.end - input.start; } } @FunctionTemplate(name = "bit_length", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class BitLength implements DrillSimpleFunc{ - + public static class BitLength implements DrillSimpleFunc { @Param VarCharHolder input; @Output BigIntHolder out; + @Override public void setup() {} + @Override public void eval() { out.value = (input.end - input.start) * 8; } @@ -266,18 +280,19 @@ public void eval() { * position('', '') 1 0 */ @FunctionTemplate(name = "position", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Position implements DrillSimpleFunc{ - + public static class Position implements DrillSimpleFunc { @Param VarCharHolder substr; @Param VarCharHolder str; @Output BigIntHolder out; + @Override public void setup() {} + @Override public void eval() { //Do string match. - int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(str.buffer, str.start, str.end, + final int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(str.buffer, str.start, str.end, substr.buffer, substr.start, substr.end); if (pos < 0) { out.value = 0; //indicate not found a matched substr. @@ -291,15 +306,16 @@ public void eval() { // same as function "position(substr, str) ", except the reverse order of argument. @FunctionTemplate(name = "strpos", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Strpos implements DrillSimpleFunc{ - + public static class Strpos implements DrillSimpleFunc { @Param VarCharHolder str; @Param VarCharHolder substr; @Output BigIntHolder out; + @Override public void setup() {} + @Override public void eval() { //Do string match. int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(str.buffer, str.start, str.end, @@ -311,22 +327,22 @@ public void eval() { out.value = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(str.buffer, str.start, pos) + 1; } } - } /* * Convert string to lower case. */ @FunctionTemplate(name = "lower", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class LowerCase implements DrillSimpleFunc{ - + public static class LowerCase implements DrillSimpleFunc { @Param VarCharHolder input; @Output VarCharHolder out; @Inject DrillBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer = buffer.reallocIfNeeded(input.end- input.start); out.start = 0; @@ -349,22 +365,24 @@ public void eval() { * Convert string to upper case. */ @FunctionTemplate(name = "upper", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class UpperCase implements DrillSimpleFunc{ + public static class UpperCase implements DrillSimpleFunc { @Param VarCharHolder input; @Output VarCharHolder out; @Inject DrillBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer = buffer.reallocIfNeeded(input.end- input.start); out.start = 0; out.end = input.end - input.start; for (int id = input.start; id < input.end; id++) { - byte currentByte = input.buffer.getByte(id); + byte currentByte = input.buffer.getByte(id); // 'A - Z' : 0x41 - 0x5A // 'a - z' : 0x61 - 0x7A @@ -381,8 +399,7 @@ public void eval() { // -- Valid "offset": [1, string_length], // -- Valid "length": [1, up to string_length - offset + 1], if length > string_length - offset +1, get the substr up to the string_lengt. @FunctionTemplate(names = {"substring", "substr"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Substring implements DrillSimpleFunc{ - + public static class Substring implements DrillSimpleFunc { @Param VarCharHolder string; @Param BigIntHolder offset; @Param BigIntHolder length; @@ -390,10 +407,11 @@ public static class Substring implements DrillSimpleFunc{ @Output VarCharHolder out; @Workspace ByteBuf buffer; + @Override public void setup() { - } + @Override public void eval() { out.buffer = string.buffer; // if length is NOT positive, or offset is NOT positive, or input string is empty, return empty string. @@ -401,9 +419,8 @@ public void eval() { out.start = out.end = 0; } else { //Do 1st scan to counter # of character in string. - int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); - - int fromCharIdx = (int) offset.value; //the start position of char (inclusive) + final int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); + final int fromCharIdx = (int) offset.value; //the start position of char (inclusive) if (fromCharIdx > charCount ) { // invalid length, return empty string. out.start = out.end = 0; @@ -417,21 +434,21 @@ public void eval() { } } } - } @FunctionTemplate(names = {"substring", "substr"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class SubstringOffset implements DrillSimpleFunc{ - + public static class SubstringOffset implements DrillSimpleFunc { @Param VarCharHolder string; @Param BigIntHolder offset; @Output VarCharHolder out; @Workspace ByteBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = string.buffer; // if length is NOT positive, or offset is NOT positive, or input string is empty, return empty string. @@ -439,9 +456,8 @@ public void eval() { out.start = out.end = 0; } else { //Do 1st scan to counter # of character in string. - int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); - - int fromCharIdx = (int) offset.value; //the start position of char (inclusive) + final int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); + final int fromCharIdx = (int) offset.value; //the start position of char (inclusive) if (fromCharIdx > charCount ) { // invalid length, return empty string. out.start = out.end = 0; @@ -451,25 +467,25 @@ public void eval() { } } } - } @FunctionTemplate(names = {"substring", "substr" }, scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) - public static class SubstringRegex implements DrillSimpleFunc{ - + public static class SubstringRegex implements DrillSimpleFunc { @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output NullableVarCharHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { matcher = java.util.regex.Pattern.compile( org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer)) .matcher(""); } + @Override public void eval() { - String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); + final String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); if (matcher.find()) { out.isSet = 1; @@ -481,24 +497,25 @@ public void eval() { } @FunctionTemplate(names = {"substring", "substr" }, scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) - public static class SubstringRegexNullable implements DrillSimpleFunc{ - + public static class SubstringRegexNullable implements DrillSimpleFunc { @Param NullableVarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output NullableVarCharHolder out; @Workspace java.util.regex.Matcher matcher; + @Override public void setup() { matcher = java.util.regex.Pattern.compile( org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer)) .matcher(""); } + @Override public void eval() { if (input.isSet == 0) { out.isSet = 0; } else { - String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); + final String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); matcher.reset(i); if (matcher.find()) { out.isSet = 1; @@ -517,17 +534,18 @@ public void eval() { // If length = 0, return empty // If length < 0, and |length| > total charcounts, return empty. @FunctionTemplate(name = "left", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Left implements DrillSimpleFunc{ - + public static class Left implements DrillSimpleFunc { @Param VarCharHolder string; @Param BigIntHolder length; @Output VarCharHolder out; @Workspace ByteBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = string.buffer; // if length is 0, or input string is empty, return empty string. @@ -535,13 +553,14 @@ public void eval() { out.start = out.end = 0; } else { //Do 1st scan to counter # of character in string. - int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); - - int charLen = 0; + final int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); + final int charLen; if (length.value > 0) { charLen = Math.min((int)length.value, charCount); //left('abc', 5) -> 'abc' } else if (length.value < 0) { charLen = Math.max(0, charCount + (int)length.value) ; // left('abc', -5) ==> '' + } else { + charLen = 0; } out.start = string.start; //Starting from the left of input string. @@ -552,17 +571,18 @@ public void eval() { //Return last 'length' characters in the string. When 'length' is negative, return all but first |length| characters. @FunctionTemplate(name = "right", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Right implements DrillSimpleFunc{ - + public static class Right implements DrillSimpleFunc { @Param VarCharHolder string; @Param BigIntHolder length; @Output VarCharHolder out; @Workspace ByteBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = string.buffer; // invalid length. @@ -570,10 +590,9 @@ public void eval() { out.start = out.end = 0; } else { //Do 1st scan to counter # of character in string. - int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); - - int fromCharIdx; //the start position of char (inclusive) - int charLen; // the end position of char (inclusive) + final int charCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(string.buffer, string.start, string.end); + final int fromCharIdx; //the start position of char (inclusive) + final int charLen; // the end position of char (inclusive) if (length.value > 0) { fromCharIdx = Math.max(charCount - (int) length.value + 1, 1); // right('abc', 5) ==> 'abc' fromCharIdx=1. charLen = charCount - fromCharIdx + 1; @@ -583,7 +602,7 @@ public void eval() { } // invalid length : right('abc', -5) -> '' - if (charLen <=0) { + if (charLen <= 0) { out.start = out.end = 0; } else { //Do 2nd scan of string. Get bytes corresponding chars in range. @@ -596,15 +615,16 @@ public void eval() { @FunctionTemplate(name = "initcap", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class InitCap implements DrillSimpleFunc{ - + public static class InitCap implements DrillSimpleFunc { @Param VarCharHolder input; @Output VarCharHolder out; @Inject DrillBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer = buffer.reallocIfNeeded(input.end - input.start); out.start = 0; @@ -616,18 +636,19 @@ public void eval() { //Replace all occurrences in 'text' of substring 'from' with substring 'to' @FunctionTemplate(name = "replace", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Replace implements DrillSimpleFunc{ - + public static class Replace implements DrillSimpleFunc { @Param VarCharHolder text; @Param VarCharHolder from; @Param VarCharHolder to; @Inject DrillBuf buffer; @Output VarCharHolder out; + @Override public void setup() { buffer = buffer.reallocIfNeeded(8000); } + @Override public void eval() { out.buffer = buffer; out.start = out.end = 0; @@ -638,9 +659,9 @@ public void eval() { //If "from" is not empty and it's length is no longer than text's length //then, we may find a match, and do replace. int i = text.start; - for (; i<=text.end - fromL; ) { + for (; i <= text.end - fromL; ) { int j = from.start; - for (; j textCharCount && fillCharCount == 0) ) { + } else if (theLength == textCharCount || (theLength > textCharCount && fillCharCount == 0) ) { //case 2: target length is same as text's length, or need fill into text but "fill" is empty, then return text directly. out.buffer = text.buffer; out.start = text.start; out.end = text.end; - } else if (length.value < textCharCount) { + } else if (theLength < textCharCount) { //case 3: truncate text on the right side. It's same as substring(text, 1, length). out.buffer = text.buffer; out.start = text.start; - out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(text.buffer, text.start, text.end, (int)length.value); - } else if (length.value > textCharCount) { - //case 4: copy "fill" on left. Total # of char to copy : length.value - textCharCount + out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(text.buffer, text.start, text.end, (int) theLength); + } else if (theLength > textCharCount) { + //case 4: copy "fill" on left. Total # of char to copy : theLength - textCharCount int count = 0; out.buffer = buffer; out.start = out.end = 0; - while (count < length.value - textCharCount) { + while (count < theLength - textCharCount) { for (id = fill.start; id < fill.end; id++) { - if (count == length.value - textCharCount) { + if (count == theLength - textCharCount) { break; } @@ -746,7 +768,6 @@ public void eval() { } } } // end of eval - } /** @@ -754,8 +775,7 @@ public void eval() { * If the string is already longer than length then it is truncated. */ @FunctionTemplate(name = "rpad", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Rpad implements DrillSimpleFunc{ - + public static class Rpad implements DrillSimpleFunc { @Param VarCharHolder text; @Param BigIntHolder length; @Param VarCharHolder fill; @@ -763,11 +783,15 @@ public static class Rpad implements DrillSimpleFunc{ @Output VarCharHolder out; + @Override public void setup() { } + @Override public void eval() { - buffer = buffer.reallocIfNeeded((int) length.value*2); + final long theLength = length.value; + final int lengthNeeded = (int) (theLength <= 0 ? 0 : theLength * 2); + buffer = buffer.reallocIfNeeded(lengthNeeded); byte currentByte = 0; int id = 0; @@ -777,21 +801,21 @@ public void eval() { //get the char length of fill. int fillCharCount = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharLength(fill.buffer, fill.start, fill.end); - if (length.value <= 0) { + if (theLength <= 0) { //case 1: target length is <=0, then return an empty string. out.buffer = buffer; out.start = out.end = 0; - } else if (length.value == textCharCount || (length.value > textCharCount && fillCharCount == 0) ) { + } else if (theLength == textCharCount || (theLength > textCharCount && fillCharCount == 0) ) { //case 2: target length is same as text's length, or need fill into text but "fill" is empty, then return text directly. out.buffer = text.buffer; out.start = text.start; out.end = text.end; - } else if (length.value < textCharCount) { + } else if (theLength < textCharCount) { //case 3: truncate text on the right side. It's same as substring(text, 1, length). out.buffer = text.buffer; out.start = text.start; - out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(text.buffer, text.start, text.end, (int)length.value); - } else if (length.value > textCharCount) { + out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(text.buffer, text.start, text.end, (int) theLength); + } else if (theLength > textCharCount) { //case 4: copy "text" into "out", then copy "fill" on the right. out.buffer = buffer; out.start = out.end = 0; @@ -800,12 +824,12 @@ public void eval() { out.buffer.setByte(out.end++, text.buffer.getByte(id)); } - //copy "fill" on right. Total # of char to copy : length.value - textCharCount + //copy "fill" on right. Total # of char to copy : theLength - textCharCount int count = 0; - while (count < length.value - textCharCount) { + while (count < theLength - textCharCount) { for (id = fill.start; id < fill.end; id++) { - if (count == length.value - textCharCount) { + if (count == theLength - textCharCount) { break; } @@ -822,23 +846,23 @@ public void eval() { } } // end of eval - } /** * Remove the longest string containing only characters from "from" from the start of "text" */ @FunctionTemplate(name = "ltrim", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Ltrim implements DrillSimpleFunc{ - + public static class Ltrim implements DrillSimpleFunc { @Param VarCharHolder text; @Param VarCharHolder from; @Output VarCharHolder out; + @Override public void setup() { } + @Override public void eval() { out.buffer = text.buffer; out.start = out.end = text.end; @@ -855,23 +879,23 @@ public void eval() { } } } // end of eval - } /** * Remove the longest string containing only characters from "from" from the end of "text" */ @FunctionTemplate(name = "rtrim", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Rtrim implements DrillSimpleFunc{ - + public static class Rtrim implements DrillSimpleFunc { @Param VarCharHolder text; @Param VarCharHolder from; @Output VarCharHolder out; + @Override public void setup() { } + @Override public void eval() { out.buffer = text.buffer; out.start = out.end = text.start; @@ -897,16 +921,17 @@ public void eval() { * Remove the longest string containing only characters from "from" from the start of "text" */ @FunctionTemplate(name = "btrim", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class Btrim implements DrillSimpleFunc{ - + public static class Btrim implements DrillSimpleFunc { @Param VarCharHolder text; @Param VarCharHolder from; @Output VarCharHolder out; + @Override public void setup() { } + @Override public void eval() { out.buffer = text.buffer; out.start = out.end = text.start; @@ -929,10 +954,10 @@ public void eval() { id--; } bytePerChar = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.utf8CharLen(text.buffer, id); - int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(from.buffer, from.start, from.end, + final int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(from.buffer, from.start, from.end, text.buffer, id, id + bytePerChar); if (pos < 0) { // Found the 1st char not in "from", stop - out.end = id+ bytePerChar; + out.end = id + bytePerChar; break; } } @@ -940,15 +965,17 @@ public void eval() { } @FunctionTemplate(name = "concatOperator", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) - public static class ConcatOperator implements DrillSimpleFunc{ + public static class ConcatOperator implements DrillSimpleFunc { @Param VarCharHolder left; @Param VarCharHolder right; @Output VarCharHolder out; @Inject DrillBuf buffer; + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; @@ -967,17 +994,17 @@ public void eval() { //Concatenate the text representations of the arguments. NULL arguments are ignored. //TODO: NullHanding.INTERNAL for DrillSimpleFunc requires change in code generation. @FunctionTemplate(name = "concat", scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) - public static class Concat implements DrillSimpleFunc{ - + public static class Concat implements DrillSimpleFunc { @Param VarCharHolder left; @Param VarCharHolder right; @Output VarCharHolder out; @Inject DrillBuf buffer; - + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; @@ -994,17 +1021,17 @@ public void eval() { } @FunctionTemplate(name = "concat", scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) - public static class ConcatRightNullInput implements DrillSimpleFunc{ - + public static class ConcatRightNullInput implements DrillSimpleFunc { @Param VarCharHolder left; @Param NullableVarCharHolder right; @Output VarCharHolder out; @Inject DrillBuf buffer; - + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start));; out.start = out.end = 0; @@ -1015,25 +1042,25 @@ public void eval() { } if (right.isSet == 1) { - for (id = right.start; id < right.end; id++) { - out.buffer.setByte(out.end++, right.buffer.getByte(id)); - } + for (id = right.start; id < right.end; id++) { + out.buffer.setByte(out.end++, right.buffer.getByte(id)); + } } } } @FunctionTemplate(name = "concat", scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) - public static class ConcatLeftNullInput implements DrillSimpleFunc{ - + public static class ConcatLeftNullInput implements DrillSimpleFunc { @Param NullableVarCharHolder left; @Param VarCharHolder right; @Output VarCharHolder out; @Inject DrillBuf buffer; - + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; @@ -1052,17 +1079,17 @@ public void eval() { } @FunctionTemplate(name = "concat", scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) - public static class ConcatBothNullInput implements DrillSimpleFunc{ - + public static class ConcatBothNullInput implements DrillSimpleFunc { @Param NullableVarCharHolder left; @Param NullableVarCharHolder right; @Output VarCharHolder out; @Inject DrillBuf buffer; - + @Override public void setup() { } + @Override public void eval() { out.buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; @@ -1086,12 +1113,13 @@ public void eval() { // "\xca\xfe\xba\xbe" => (byte[]) {(byte)0xca, (byte)0xfe, (byte)0xba, (byte)0xbe} @FunctionTemplate(name = "binary_string", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) public static class BinaryString implements DrillSimpleFunc { - @Param VarCharHolder in; @Output VarBinaryHolder out; - public void setup() { } + @Override + public void setup() {} + @Override public void eval() { out.buffer = in.buffer; out.start = in.start; @@ -1104,16 +1132,17 @@ public void eval() { // (byte[]) {(byte)0xca, (byte)0xfe, (byte)0xba, (byte)0xbe} => "\xca\xfe\xba\xbe" @FunctionTemplate(name = "string_binary", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) public static class StringBinary implements DrillSimpleFunc { - @Param VarBinaryHolder in; @Output VarCharHolder out; @Workspace Charset charset; @Inject DrillBuf buffer; + @Override public void setup() { charset = java.nio.charset.Charset.forName("UTF-8"); } + @Override public void eval() { byte[] buf = org.apache.drill.common.util.DrillStringUtils.toBinaryString(in.buffer, in.start, in.end).getBytes(charset); buffer.setBytes(0, buf); @@ -1130,12 +1159,13 @@ public void eval() { */ @FunctionTemplate(name = "ascii", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) public static class AsciiString implements DrillSimpleFunc { - @Param VarCharHolder in; @Output IntHolder out; - public void setup() { } + @Override + public void setup() {} + @Override public void eval() { out.value = in.buffer.getByte(in.start); } @@ -1146,15 +1176,16 @@ public void eval() { */ @FunctionTemplate(name = "chr", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) public static class AsciiToChar implements DrillSimpleFunc { - @Param IntHolder in; @Output VarCharHolder out; @Inject DrillBuf buf; + @Override public void setup() { buf = buf.reallocIfNeeded(1); } + @Override public void eval() { out.buffer = buf; out.start = out.end = 0; @@ -1174,12 +1205,14 @@ public static class RepeatString implements DrillSimpleFunc { @Output VarCharHolder out; @Inject DrillBuf buffer; + @Override public void setup() { } + @Override public void eval() { final int len = in.end - in.start; - int num = nTimes.value; + final int num = nTimes.value; out.start = 0; out.buffer = buffer = buffer.reallocIfNeeded( len * num ); for (int i =0; i < num; i++) { @@ -1194,24 +1227,25 @@ public void eval() { */ @FunctionTemplate(name = "toascii", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) public static class AsciiEndode implements DrillSimpleFunc { - @Param VarCharHolder in; @Param VarCharHolder enc; @Output VarCharHolder out; @Workspace Charset inCharset; @Inject DrillBuf buffer; + @Override public void setup() { inCharset = java.nio.charset.Charset.forName(org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(enc.start, enc.end, enc.buffer)); } + @Override public void eval() { - byte[] bytea = new byte[in.end - in.start]; - int index =0; - for (int i = in.start; i 0) { @@ -1254,5 +1289,4 @@ public void eval() { } } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java index c953bb376d1..e0f1eab3a13 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java @@ -21,8 +21,6 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.exec.memory.BufferAllocator; -import java.io.Closeable; - /** * Manages a list of {@link DrillBuf}s that can be reallocated as needed. Upon * re-allocation the old buffer will be freed. Managing a list of these buffers @@ -38,7 +36,6 @@ * and {@link QueryContext}. */ public class BufferManager implements AutoCloseable { - private LongObjectOpenHashMap managedBuffers = new LongObjectOpenHashMap<>(); private final BufferAllocator allocator; @@ -51,11 +48,12 @@ public BufferManager(BufferAllocator allocator, FragmentContext fragmentContext) this.fragmentContext = fragmentContext; } + @Override public void close() throws Exception { - Object[] mbuffers = ((LongObjectOpenHashMap)(Object)managedBuffers).values; - for (int i =0; i < mbuffers.length; i++) { + final Object[] mbuffers = ((LongObjectOpenHashMap) (Object) managedBuffers).values; + for (int i = 0; i < mbuffers.length; i++) { if (managedBuffers.allocated[i]) { - ((DrillBuf)mbuffers[i]).release(); + ((DrillBuf) mbuffers[i]).release(1); } } managedBuffers.clear(); @@ -65,7 +63,7 @@ public DrillBuf replace(DrillBuf old, int newSize) { if (managedBuffers.remove(old.memoryAddress()) == null) { throw new IllegalStateException("Tried to remove unmanaged buffer."); } - old.release(); + old.release(1); return getManagedBuffer(newSize); } @@ -80,5 +78,4 @@ public DrillBuf getManagedBuffer(int size) { newBuf.setBufferManager(this); return newBuf; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 66558befe04..98b28763d29 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl; -import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.LinkedList; @@ -44,10 +43,9 @@ * Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree. */ public class ImplCreator { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class); - private RootExec root = null; - private LinkedList operators = Lists.newLinkedList(); + private final LinkedList operators = Lists.newLinkedList(); private ImplCreator() {} @@ -109,6 +107,7 @@ private RootExec getRootExec(final FragmentRoot root, final FragmentContext cont final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(root.getUserName(), context.getQueryUserName()); try { return proxyUgi.doAs(new PrivilegedExceptionAction() { + @Override public RootExec run() throws Exception { return ((RootCreator) getOpCreator(root, context)).getRoot(context, root, childRecordBatches); } @@ -134,6 +133,7 @@ private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentCont final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(op.getUserName(), context.getQueryUserName()); try { return proxyUgi.doAs(new PrivilegedExceptionAction() { + @Override public RecordBatch run() throws Exception { final CloseableRecordBatch batch = ((BatchCreator) getOpCreator(op, context)).getBatch( context, op, childRecordBatches); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 363205c0b9d..97b8d97ea35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -38,19 +38,20 @@ import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; -public class ScreenCreator implements RootCreator{ - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class); +public class ScreenCreator implements RootCreator { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class); @Override - public RootExec getRoot(FragmentContext context, Screen config, List children) throws ExecutionSetupException { + public RootExec getRoot(FragmentContext context, Screen config, List children) + throws ExecutionSetupException { Preconditions.checkNotNull(children); Preconditions.checkArgument(children.size() == 1); return new ScreenRoot(context, children.iterator().next(), config); } - static class ScreenRoot extends BaseRootExec { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); private final RecordBatch incoming; private final FragmentContext context; private final AccountingUserConnection userConnection; @@ -71,7 +72,7 @@ public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) super(context, config); this.context = context; this.incoming = incoming; - this.userConnection = context.getUserDataTunnel(); + userConnection = context.getUserDataTunnel(); } @Override @@ -87,10 +88,10 @@ public boolean innerNext() { if (firstBatch) { // this is the only data message sent to the client and may contain the schema QueryWritableBatch batch; - QueryData header = QueryData.newBuilder() // - .setQueryId(context.getHandle().getQueryId()) // - .setRowCount(0) // - .setDef(RecordBatchDef.getDefaultInstance()) // + QueryData header = QueryData.newBuilder() + .setQueryId(context.getHandle().getQueryId()) + .setRowCount(0) + .setDef(RecordBatchDef.getDefaultInstance()) .build(); batch = new QueryWritableBatch(header); @@ -109,7 +110,7 @@ public boolean innerNext() { //$FALL-THROUGH$ case OK: injector.injectPause(context.getExecutionControls(), "sending-data", logger); - QueryWritableBatch batch = materializer.convertNext(); + final QueryWritableBatch batch = materializer.convertNext(); updateStats(batch); stats.startWait(); try { @@ -133,13 +134,10 @@ RecordBatch getIncoming() { return incoming; } - @Override public void close() throws Exception { injector.injectPause(context.getExecutionControls(), "send-complete", logger); super.close(); } } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index f38275910a7..248c8da0de2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -52,7 +52,6 @@ public static class SingleSenderRootExec extends BaseRootExec { private RecordBatch incoming; private AccountingDataTunnel tunnel; private FragmentHandle handle; - private SingleSender config; private int recMajor; private volatile boolean ok = true; private volatile boolean done = false; @@ -69,11 +68,10 @@ public int metricId() { public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { super(context, context.newOperatorContext(config, null, false), config); this.incoming = batch; - assert(incoming != null); - this.handle = context.getHandle(); - this.config = config; - this.recMajor = config.getOppositeMajorFragmentId(); - this.tunnel = context.getDataTunnel(config.getDestination()); + assert incoming != null; + handle = context.getHandle(); + recMajor = config.getOppositeMajorFragmentId(); + tunnel = context.getDataTunnel(config.getDestination()); oppositeHandle = handle.toBuilder() .setMajorFragmentId(config.getOppositeMajorFragmentId()) .setMinorFragmentId(config.getOppositeMinorFragmentId()) @@ -104,10 +102,10 @@ public boolean innerNext() { case STOP: case NONE: // if we didn't do anything yet, send an empty schema. - final BatchSchema sendSchema = incoming.getSchema() == null ? BatchSchema.newBuilder().build() : incoming - .getSchema(); + final BatchSchema sendSchema = incoming.getSchema() == null ? + BatchSchema.newBuilder().build() : incoming.getSchema(); - FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), + final FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), sendSchema); stats.startWait(); @@ -120,8 +118,10 @@ public boolean innerNext() { case OK_NEW_SCHEMA: case OK: - FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), - handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), incoming.getWritableBatch()); + final FragmentWritableBatch batch = new FragmentWritableBatch( + false, handle.getQueryId(), handle.getMajorFragmentId(), + handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), + incoming.getWritableBatch()); updateStats(batch); stats.startWait(); try { @@ -146,5 +146,4 @@ public void receivingFragmentFinished(FragmentHandle handle) { done = true; } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index 7e22e659a13..5cdfc5d37df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -38,10 +38,10 @@ import com.google.common.base.Stopwatch; public abstract class PriorityQueueTemplate implements PriorityQueue { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class); - private SelectionVector4 heapSv4;//This holds the heap - private SelectionVector4 finalSv4;//This is for final sorted output + private SelectionVector4 heapSv4; //This holds the heap + private SelectionVector4 finalSv4; //This is for final sorted output private ExpandableHyperContainer hyperBatch; private FragmentContext context; private BufferAllocator allocator; @@ -60,6 +60,7 @@ public void init(int limit, FragmentContext context, BufferAllocator allocator, this.hasSv2 = hasSv2; } + @Override public void resetQueue(VectorContainer container, SelectionVector4 v4) throws SchemaChangeException { assert container.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE; BatchSchema schema = container.getSchema(); @@ -69,10 +70,10 @@ public void resetQueue(VectorContainer container, SelectionVector4 v4) throws Sc newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors()); } newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); - this.hyperBatch = new ExpandableHyperContainer(newContainer); - this.batchCount = hyperBatch.iterator().next().getValueVectors().length; + hyperBatch = new ExpandableHyperContainer(newContainer); + batchCount = hyperBatch.iterator().next().getValueVectors().length; final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1)); - this.heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE); + heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE); for (int i = 0; i < v4.getTotalCount(); i++) { heapSv4.set(i, v4.get(i)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 0a80b27ace8..34c514ed317 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -48,14 +48,14 @@ public class WriterRecordBatch extends AbstractRecordBatch { private long counter = 0; private final RecordBatch incoming; private boolean processed = false; - private String fragmentUniqueId; + private final String fragmentUniqueId; private BatchSchema schema; public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException { super(writer, context, false); this.incoming = incoming; - FragmentHandle handle = context.getHandle(); + final FragmentHandle handle = context.getHandle(); fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); this.recordWriter = recordWriter; } @@ -77,7 +77,7 @@ public BatchSchema getSchema() { @Override public IterOutcome innerNext() { - if(processed) { + if (processed) { // cleanup(); // if the upstream record batch is already processed and next() is called by // downstream then return NONE to indicate completion @@ -86,7 +86,7 @@ public IterOutcome innerNext() { // process the complete upstream in one next() call IterOutcome upstream; - try{ + try { do { upstream = next(incoming); @@ -101,12 +101,12 @@ public IterOutcome innerNext() { case OK_NEW_SCHEMA: setupNewSchema(); - // fall through. + // $FALL-THROUGH$ case OK: counter += eventBasedRecordWriter.write(incoming.getRecordCount()); logger.debug("Total records written so far: {}", counter); - for(VectorWrapper v : incoming) { + for(final VectorWrapper v : incoming) { v.getValueVector().clear(); } break; @@ -130,14 +130,15 @@ public IterOutcome innerNext() { return IterOutcome.OK_NEW_SCHEMA; } - private void addOutputContainerData(){ - VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById( // - VarCharVector.class, // - container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds() // - ).getValueVector(); + private void addOutputContainerData() { + final VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById( + VarCharVector.class, + container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds()) + .getValueVector(); AllocationHelper.allocate(fragmentIdVector, 1, 50); - BigIntVector summaryVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class, - container.getValueVectorId(SchemaPath.getSimplePath("Number of records written")).getFieldIds()).getValueVector(); + final BigIntVector summaryVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class, + container.getValueVectorId(SchemaPath.getSimplePath("Number of records written")).getFieldIds()) + .getValueVector(); AllocationHelper.allocate(summaryVector, 1, 8); fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes()); fragmentIdVector.getMutator().setValueCount(1); @@ -155,13 +156,16 @@ protected void setupNewSchema() throws IOException { // Create two vectors for: // 1. Fragment unique id. // 2. Summary: currently contains number of records written. - MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR)); - MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT)); + final MaterializedField fragmentIdField = + MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR)); + final MaterializedField summaryField = + MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), + Types.required(MinorType.BIGINT)); container.addOrGet(fragmentIdField); container.addOrGet(summaryField); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - } finally{ + } finally { stats.stopSetup(); } @@ -186,5 +190,4 @@ public void close() { closeWriter(); super.close(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 5a26134e689..2ab1e661df6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -113,7 +113,7 @@ public void buildSchema() throws SchemaChangeException { if (!createAggregator()) { state = BatchState.DONE; } - for (VectorWrapper w : container) { + for (final VectorWrapper w : container) { w.getValueVector().allocateNew(); } } @@ -206,8 +206,8 @@ public IterOutcome innerNext() { */ private void constructSpecialBatch() { int exprIndex = 0; - for (VectorWrapper vw: container) { - ValueVector vv = vw.getValueVector(); + for (final VectorWrapper vw: container) { + final ValueVector vv = vw.getValueVector(); AllocationHelper.allocateNew(vv, SPECIAL_BATCH_COUNT); vv.getMutator().setValueCount(SPECIAL_BATCH_COUNT); if (vv.getField().getType().getMode() == TypeProtos.DataMode.REQUIRED) { @@ -265,20 +265,20 @@ private StreamingAggregator createAggregatorInternal() throws SchemaChangeExcept ErrorCollector collector = new ErrorCollectorImpl(); - for (int i =0; i < keyExprs.length; i++) { - NamedExpression ne = popConfig.getKeys()[i]; + for (int i = 0; i < keyExprs.length; i++) { + final NamedExpression ne = popConfig.getKeys()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() ); if (expr == null) { continue; } keyExprs[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); - ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); keyOutputIds[i] = container.add(vector); } - for (int i =0; i < valueExprs.length; i++) { - NamedExpression ne = popConfig.getExprs()[i]; + for (int i = 0; i < valueExprs.length; i++) { + final NamedExpression ne = popConfig.getExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); if (expr == null) { continue; @@ -315,17 +315,17 @@ private StreamingAggregator createAggregatorInternal() throws SchemaChangeExcept private void setupIsSame(ClassGenerator cg, LogicalExpression[] keyExprs) { cg.setMappingSet(IS_SAME_I1); - for (LogicalExpression expr : keyExprs) { + for (final LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(IS_SAME_I1); - HoldingContainer first = cg.addExpr(expr, false); + final HoldingContainer first = cg.addExpr(expr, false); cg.setMappingSet(IS_SAME_I2); - HoldingContainer second = cg.addExpr(expr, false); + final HoldingContainer second = cg.addExpr(expr, false); - LogicalExpression fh = + final LogicalExpression fh = FunctionGenerationHelper .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry()); - HoldingContainer out = cg.addExpr(fh, false); + final HoldingContainer out = cg.addExpr(fh, false); cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); } cg.getEvalBlock()._return(JExpr.TRUE); @@ -338,17 +338,17 @@ private void setupIsSame(ClassGenerator cg, LogicalExpressi private void setupIsSameApart(ClassGenerator cg, LogicalExpression[] keyExprs) { cg.setMappingSet(ISA_B1); - for (LogicalExpression expr : keyExprs) { + for (final LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(ISA_B1); - HoldingContainer first = cg.addExpr(expr, false); + final HoldingContainer first = cg.addExpr(expr, false); cg.setMappingSet(ISA_B2); - HoldingContainer second = cg.addExpr(expr, false); + final HoldingContainer second = cg.addExpr(expr, false); - LogicalExpression fh = + final LogicalExpression fh = FunctionGenerationHelper .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry()); - HoldingContainer out = cg.addExpr(fh, false); + final HoldingContainer out = cg.addExpr(fh, false); cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); } cg.getEvalBlock()._return(JExpr.TRUE); @@ -360,8 +360,8 @@ private void setupIsSameApart(ClassGenerator cg, LogicalExp private void addRecordValues(ClassGenerator cg, LogicalExpression[] valueExprs) { cg.setMappingSet(EVAL); - for (LogicalExpression ex : valueExprs) { - HoldingContainer hc = cg.addExpr(ex); + for (final LogicalExpression ex : valueExprs) { + final HoldingContainer hc = cg.addExpr(ex); } } @@ -369,8 +369,8 @@ private void addRecordValues(ClassGenerator cg, LogicalExpr private void outputRecordKeys(ClassGenerator cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS); - for (int i =0; i < keyExprs.length; i++) { - HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true)); + for (int i = 0; i < keyExprs.length; i++) { + final HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true)); } } @@ -383,15 +383,14 @@ private void outputRecordKeys(ClassGenerator cg, TypedField private void outputRecordKeysPrev(ClassGenerator cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS_PREV); - for (int i =0; i < keyExprs.length; i++) { + for (int i = 0; i < keyExprs.length; i++) { // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this. logger.debug("Writing out expr {}", keyExprs[i]); cg.rotateBlock(); cg.setMappingSet(RECORD_KEYS_PREV); - HoldingContainer innerExpression = cg.addExpr(keyExprs[i], false); + final HoldingContainer innerExpression = cg.addExpr(keyExprs[i], false); cg.setMappingSet(RECORD_KEYS_PREV_OUT); - HoldingContainer outerExpression = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), false); - + final HoldingContainer outerExpression = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), false); } } @@ -428,5 +427,4 @@ public void close() { protected void killIncoming(boolean sendUpstream) { incoming.kill(sendUpstream); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index c6a07f8aa37..98db0cdba59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -41,8 +41,7 @@ * to all nodes is cheaper than merging and computing all the joins in the same node. */ public class BroadcastSenderRootExec extends BaseRootExec { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class); - private final FragmentContext context; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class); private final BroadcastSender config; private final int[][] receivingMinorFragments; private final AccountingDataTunnel[] tunnels; @@ -64,7 +63,6 @@ public BroadcastSenderRootExec(FragmentContext context, BroadcastSender config) throws OutOfMemoryException { super(context, context.newOperatorContext(config, null, false), config); this.ok = true; - this.context = context; this.incoming = incoming; this.config = config; this.handle = context.getHandle(); @@ -80,7 +78,7 @@ public BroadcastSenderRootExec(FragmentContext context, this.tunnels = new AccountingDataTunnel[destCount]; this.receivingMinorFragments = new int[destCount][]; - for(DrillbitEndpoint ep : dests.keySet()){ + for(final DrillbitEndpoint ep : dests.keySet()){ List minorsList= dests.get(ep); int[] minorsArray = new int[minorsList.size()]; int x = 0; @@ -154,5 +152,4 @@ public void updateStats(FragmentWritableBatch writableBatch) { stats.setLongStat(Metric.N_RECEIVERS, tunnels.length); stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index c1d78c3746e..432e06b8e8d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -28,7 +28,6 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Filter; @@ -44,11 +43,10 @@ import com.google.common.collect.Lists; public class FilterRecordBatch extends AbstractSingleRecordBatch{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class); + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class); private SelectionVector2 sv2; private SelectionVector4 sv4; - private BufferAllocator.PreAllocator svAllocator; private Filterer filter; public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { @@ -84,7 +82,6 @@ protected IterOutcome doWork() { return IterOutcome.OK; } - @Override public void close() { if (sv2 != null) { @@ -152,15 +149,9 @@ protected Filterer generateSV4Filterer() throws SchemaChangeException { cg.addExpr(new ReturnValueExpression(expr)); -// for (VectorWrapper i : incoming) { -// ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); -// container.add(v); -// allocators.add(getAllocator4(v)); -// } - - for (VectorWrapper vw : incoming) { - for (ValueVector vv : vw.getValueVectors()) { - TransferPair pair = vv.getTransferPair(); + for (final VectorWrapper vw : incoming) { + for (final ValueVector vv : vw.getValueVectors()) { + final TransferPair pair = vv.getTransferPair(); container.add(pair.getTo()); transfers.add(pair); } @@ -170,8 +161,8 @@ protected Filterer generateSV4Filterer() throws SchemaChangeException { container.buildSchema(SelectionVectorMode.FOUR_BYTE); try { - TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); - Filterer filter = context.getImplementationClass(cg); + final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); + final Filterer filter = context.getImplementationClass(cg); filter.setup(context, incoming, this, tx); return filter; } catch (ClassTransformationException | IOException e) { @@ -192,21 +183,18 @@ protected Filterer generateSV2Filterer() throws SchemaChangeException { cg.addExpr(new ReturnValueExpression(expr)); - for (VectorWrapper v : incoming) { - TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); + for (final VectorWrapper v : incoming) { + final TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); transfers.add(pair); } - try { - TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); - Filterer filter = context.getImplementationClass(cg); + final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); + final Filterer filter = context.getImplementationClass(cg); filter.setup(context, incoming, this, tx); return filter; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 73f34356869..21ebd9ac225 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -63,7 +63,6 @@ import com.sun.codemodel.JVar; public class HashJoinBatch extends AbstractRecordBatch { - public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024; public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000; @@ -186,8 +185,8 @@ protected void buildSchema() throws SchemaChangeException { hjHelper = new HashJoinHelper(context, oContext.getAllocator()); try { rightSchema = right.getSchema(); - VectorContainer vectors = new VectorContainer(oContext); - for (VectorWrapper w : right) { + final VectorContainer vectors = new VectorContainer(oContext); + for (final VectorWrapper w : right) { vectors.addOrGet(w.getField()); } vectors.buildSchema(SelectionVectorMode.NONE); @@ -198,7 +197,7 @@ protected void buildSchema() throws SchemaChangeException { setupHashTable(); hashJoinProbe = setupHashJoinProbe(); // Build the container schema and set the counts - for (VectorWrapper w : container) { + for (final VectorWrapper w : container) { w.getValueVector().allocateNew(); } container.buildSchema(BatchSchema.SelectionVectorMode.NONE); @@ -243,8 +242,7 @@ public IterOutcome innerNext() { state = BatchState.NOT_FIRST; } - - for (VectorWrapper v : container) { + for (final VectorWrapper v : container) { v.getValueVector().getMutator().setValueCount(outputRecords); } @@ -253,13 +251,13 @@ public IterOutcome innerNext() { } else { // Our build side is empty, we won't have any matches, clear the probe side if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (VectorWrapper wrapper : left) { + for (final VectorWrapper wrapper : left) { wrapper.getValueVector().clear(); } left.kill(true); leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (VectorWrapper wrapper : left) { + for (final VectorWrapper wrapper : left) { wrapper.getValueVector().clear(); } leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); @@ -281,11 +279,9 @@ public IterOutcome innerNext() { } public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException { - // Setup the hash table configuration object int conditionsSize = conditions.size(); - - NamedExpression rightExpr[] = new NamedExpression[conditionsSize]; + final NamedExpression rightExpr[] = new NamedExpression[conditionsSize]; NamedExpression leftExpr[] = new NamedExpression[conditionsSize]; JoinComparator comparator = JoinComparator.NONE; @@ -299,7 +295,7 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra } assert comparator != JoinComparator.NONE; - boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false; + final boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false; // Set the left named expression to be null if the probe batch is empty. if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) { @@ -310,24 +306,23 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra } } - HashTableConfig htConfig = + final HashTableConfig htConfig = new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); // Create the chained hash table - ChainedHashTable ht = + final ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null, areNullsEqual); hashTable = ht.createAndSetupHashTable(null); } public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { - //Setup the underlying hash table // skip first batch if count is zero, as it may be an empty schema batch if (right.getRecordCount() == 0) { - for (VectorWrapper w : right) { + for (final VectorWrapper w : right) { w.clear(); } rightUpstream = next(right); @@ -336,9 +331,7 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio boolean moreData = true; while (moreData) { - switch (rightUpstream) { - case OUT_OF_MEMORY: case NONE: case NOT_YET: @@ -362,7 +355,7 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio } // Fall through case OK: - int currentRecordCount = right.getRecordCount(); + final int currentRecordCount = right.getRecordCount(); /* For every new build batch, we store some state in the helper context * Add new state to the helper context @@ -370,11 +363,10 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio hjHelper.addNewBatch(currentRecordCount); // Holder contains the global index where the key is hashed into using the hash table - IndexPointer htIndex = new IndexPointer(); + final IndexPointer htIndex = new IndexPointer(); // For every record in the build batch , hash the key columns for (int i = 0; i < currentRecordCount; i++) { - hashTable.put(i, htIndex, 1 /* retry count */); /* Use the global index returned by the hash table, to store @@ -388,7 +380,7 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio * to the hyper vector container. Will be used when we want to retrieve * records that have matching keys on the probe side. */ - RecordBatchData nextBatch = new RecordBatchData(right); + final RecordBatchData nextBatch = new RecordBatchData(right); boolean success = false; try { if (hyperContainer == null) { @@ -412,27 +404,22 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio } } - public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { - - final CodeGenerator cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - ClassGenerator g = cg.getRoot(); + final ClassGenerator g = cg.getRoot(); // Generate the code to project build side records g.setMappingSet(projectBuildMapping); - int fieldId = 0; - JExpression buildIndex = JExpr.direct("buildIndex"); - JExpression outIndex = JExpr.direct("outIndex"); + final JExpression buildIndex = JExpr.direct("buildIndex"); + final JExpression outIndex = JExpr.direct("outIndex"); g.rotateBlock(); if (rightSchema != null) { - for (MaterializedField field : rightSchema) { - - MajorType inputType = field.getType(); - MajorType outputType; + for (final MaterializedField field : rightSchema) { + final MajorType inputType = field.getType(); + final MajorType outputType; // If left or full outer join, then the output type must be nullable. However, map types are // not nullable so we must exclude them from the check below (see DRILL-2197). if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED @@ -447,8 +434,8 @@ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, I // Add the vector to our output container container.addOrGet(projected); - JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId)); + final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId)); + final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId)); g.getEvalBlock().add(outVV.invoke("copyFromSafe") .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) .arg(outIndex) @@ -463,14 +450,12 @@ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, I int outputFieldId = fieldId; fieldId = 0; - JExpression probeIndex = JExpr.direct("probeIndex"); - int recordCount = 0; + final JExpression probeIndex = JExpr.direct("probeIndex"); if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) { - for (VectorWrapper vv : left) { - - MajorType inputType = vv.getField().getType(); - MajorType outputType; + for (final VectorWrapper vv : left) { + final MajorType inputType = vv.getField().getType(); + final MajorType outputType; // If right or full outer join then the output type should be optional. However, map types are // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197). @@ -481,30 +466,28 @@ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, I outputType = inputType; } - ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType)); + final ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType)); if (v instanceof AbstractContainerVector) { vv.getValueVector().makeTransferPair(v); v.clear(); } - JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId)); + final JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId)); + final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId)); g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV)); fieldId++; outputFieldId++; } - recordCount = left.getRecordCount(); } - HashJoinProbe hj = context.getImplementationClass(cg); - + final HashJoinProbe hj = context.getImplementationClass(cg); return hj; } private void allocateVectors() { - for (VectorWrapper v : container) { + for (final VectorWrapper v : container) { v.getValueVector().allocateNew(); } } @@ -514,8 +497,8 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch super(popConfig, context, true); this.left = left; this.right = right; - this.joinType = popConfig.getJoinType(); - this.conditions = popConfig.getConditions(); + joinType = popConfig.getJoinType(); + conditions = popConfig.getConditions(); } private void updateStats(HashTable htable) { @@ -523,16 +506,16 @@ private void updateStats(HashTable htable) { return; } htable.getStats(htStats); - this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); - this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); - this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); - this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime); + stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); + stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); + stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); + stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime); } @Override public void killIncoming(boolean sendUpstream) { - this.left.kill(sendUpstream); - this.right.kill(sendUpstream); + left.kill(sendUpstream); + right.kill(sendUpstream); } @Override @@ -551,5 +534,4 @@ public void close() { } super.close(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 2d37fa56951..8f2dad39461 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -39,9 +39,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.server.options.DrillConfigIterator.Iter; import org.apache.drill.exec.vector.AllocationHelper; - import com.google.common.base.Preconditions; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; @@ -183,7 +181,7 @@ public IterOutcome innerNext() { outputRecords = nljWorker.outputRecords(); // Set the record count - for (VectorWrapper vw : container) { + for (final VectorWrapper vw : container) { vw.getValueVector().getMutator().setValueCount(outputRecords); } @@ -202,7 +200,7 @@ private void killAndDrainRight() { } right.kill(true); while (hasMore(rightUpstream)) { - for (VectorWrapper wrapper : right) { + for (final VectorWrapper wrapper : right) { wrapper.getValueVector().clear(); } rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); @@ -280,7 +278,7 @@ private NestedLoopJoin setupWorker() throws IOException, ClassTransformationExce * Simple method to allocate space for all the vectors in the container. */ private void allocateVectors() { - for (VectorWrapper vw : container) { + for (final VectorWrapper vw : container) { AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE); } } @@ -309,7 +307,7 @@ protected void buildSchema() throws SchemaChangeException { if (leftUpstream != IterOutcome.NONE) { leftSchema = left.getSchema(); - for (VectorWrapper vw : left) { + for (final VectorWrapper vw : left) { container.addOrGet(vw.getField()); } @@ -321,7 +319,7 @@ protected void buildSchema() throws SchemaChangeException { if (rightUpstream != IterOutcome.NONE) { rightSchema = right.getSchema(); - for (VectorWrapper vw : right) { + for (final VectorWrapper vw : right) { container.addOrGet(vw.getField()); } addBatchToHyperContainer(right); @@ -341,7 +339,7 @@ protected void buildSchema() throws SchemaChangeException { } private void addBatchToHyperContainer(RecordBatch inputBatch) { - RecordBatchData batchCopy = new RecordBatchData(inputBatch); + final RecordBatchData batchCopy = new RecordBatchData(inputBatch); boolean success = false; try { rightCounts.addLast(inputBatch.getRecordCount()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 4ea5a5cc6de..06dd6998f85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -33,19 +33,19 @@ import com.google.common.collect.Lists; public class LimitRecordBatch extends AbstractSingleRecordBatch { - -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; private int recordsToSkip; private int recordsLeft; - private boolean noEndLimit; + private final boolean noEndLimit; private boolean skipBatch; private boolean first = true; - List transfers = Lists.newArrayList(); + private final List transfers = Lists.newArrayList(); - public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) + throws OutOfMemoryException { super(popConfig, context, incoming); outgoingSv = new SelectionVector2(oContext.getAllocator()); recordsToSkip = popConfig.getFirst(); @@ -62,14 +62,15 @@ protected boolean setupNewSchema() throws SchemaChangeException { transfers.clear(); - for(VectorWrapper v : incoming){ - TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); + for(final VectorWrapper v : incoming) { + final TransferPair pair = v.getValueVector().makeTransferPair( + container.addOrGet(v.getField(), callBack)); transfers.add(pair); } - BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); + final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); - switch(svMode){ + switch(svMode) { case NONE: break; case TWO_BYTE: @@ -98,7 +99,6 @@ public IterOutcome innerNext() { } while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { - // Clear the memory for the incoming batch for (VectorWrapper wrapper : incoming) { wrapper.getValueVector().clear(); @@ -126,15 +126,15 @@ protected IterOutcome doWork() { first = false; } skipBatch = false; - int recordCount = incoming.getRecordCount(); + final int recordCount = incoming.getRecordCount(); if (recordCount == 0) { skipBatch = true; return IterOutcome.OK; } - for(TransferPair tp : transfers) { + for(final TransferPair tp : transfers) { tp.transfer(); } - if(recordCount <= recordsToSkip) { + if (recordCount <= recordsToSkip) { recordsToSkip -= recordCount; skipBatch = true; } else { @@ -149,8 +149,9 @@ protected IterOutcome doWork() { return IterOutcome.OK; } + // These two functions are identical except for the computation of the index; merge private void limitWithNoSV(int recordCount) { - int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); + final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); recordsToSkip -= offset; int fetch; @@ -162,15 +163,14 @@ private void limitWithNoSV(int recordCount) { } int svIndex = 0; - for(char i = (char) offset; i < fetch; i++) { + for(char i = (char) offset; i < fetch; svIndex++, i++) { outgoingSv.setIndex(svIndex, i); - svIndex++; } outgoingSv.setRecordCount(svIndex); } private void limitWithSV(int recordCount) { - int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); + final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); recordsToSkip -= offset; int fetch; @@ -182,10 +182,9 @@ private void limitWithSV(int recordCount) { } int svIndex = 0; - for(int i = offset; i < fetch; i++) { - char index = incomingSv.getIndex(i); + for(int i = offset; i < fetch; svIndex++, i++) { + final char index = incomingSv.getIndex(i); outgoingSv.setIndex(svIndex, index); - svIndex++; } outgoingSv.setRecordCount(svIndex); } @@ -196,9 +195,8 @@ public int getRecordCount() { } @Override - public void close(){ + public void close() { outgoingSv.clear(); super.close(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 0050b45bd59..3061f99ae97 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -159,8 +159,8 @@ public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incom @Override public void close() { super.close(); - this.partitionVectors.clear(); - this.partitionKeyVector.clear(); + partitionVectors.clear(); + partitionKeyVector.clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 31fc1609a24..6e49e78c398 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -58,8 +58,7 @@ import com.sun.codemodel.JType; public class PartitionSenderRootExec extends BaseRootExec { - - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); private RecordBatch incoming; private HashPartitionSender operator; private PartitionerDecorator partitioner; @@ -105,10 +104,10 @@ public PartitionSenderRootExec(FragmentContext context, this.incoming = incoming; this.operator = operator; this.context = context; - this.outGoingBatchCount = operator.getDestinations().size(); - this.popConfig = operator; - this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); - this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount); + outGoingBatchCount = operator.getDestinations().size(); + popConfig = operator; + remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); + remaingReceiverCount = new AtomicInteger(outGoingBatchCount); stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount); // Algorithm to figure out number of threads to parallelize output // numberOfRows/sliceTarget/numReceivers/threadfactor @@ -137,7 +136,6 @@ public PartitionSenderRootExec(FragmentContext context, @Override public boolean innerNext() { - if (!ok) { return false; } @@ -332,6 +330,7 @@ public void receivingFragmentFinished(FragmentHandle handle) { } } + @Override public void close() throws Exception { logger.debug("Partition sender stopping."); super.close(); @@ -340,7 +339,6 @@ public void close() throws Exception { updateAggregateStats(); partitioner.clear(); } - } public void sendEmptyBatch(boolean isLast) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index b9a16413363..98ee32022d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -108,7 +108,7 @@ private boolean load(final RecordBatchData batch) { return false; } else { container.clear(); - for (final VectorWrapper w : newContainer) { + for (final VectorWrapper w : newContainer) { container.add(w.getValueVector()); } container.buildSchema(SelectionVectorMode.NONE); @@ -118,7 +118,6 @@ private boolean load(final RecordBatchData batch) { } private class Producer implements Runnable { - RecordBatchDataWrapper wrapper; @Override @@ -206,7 +205,7 @@ public void close() { cleanUpLatch.await(); } catch (final InterruptedException e) { logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e); - // TODO InterruptedException + // TODO we should retry to wait for the latch } finally { super.close(); clearQueue(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 407f05d8664..cebefa52680 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -92,9 +92,9 @@ public IterOutcome innerNext() { if (schema != null) { if (getSelectionVector4().next()) { return IterOutcome.OK; - } else { - return IterOutcome.NONE; } + + return IterOutcome.NONE; } try{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index b5b1b0afe2a..cb04244fb81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -185,7 +185,7 @@ private void handleRemainder() { } @Override - public void close(){ + public void close() { super.close(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index f1da1db98a5..701ead5f3c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -171,7 +171,7 @@ public IterOutcome innerNext() { @Override protected void buildSchema() throws SchemaChangeException { logger.trace("buildSchema()"); - IterOutcome outcome = next(incoming); + final IterOutcome outcome = next(incoming); switch (outcome) { case NONE: state = BatchState.DONE; @@ -208,7 +208,7 @@ private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeExc container.clear(); // all existing vectors will be transferred to the outgoing container in framer.doWork() - for (VectorWrapper wrapper : batch) { + for (final VectorWrapper wrapper : batch) { container.addOrGet(wrapper.getField()); } @@ -292,10 +292,10 @@ private void setupIsFunction(final ClassGenerator cg, final List batchGroupLis g.setMappingSet(MAIN_MAPPING); copier = context.getImplementationClass(cg); } else { - copier.cleanup(); + copier.close(); } BufferAllocator allocator = spilling ? copierAllocator : oContext.getAllocator(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 37529ffb291..d42e8d483d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -37,14 +37,13 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Queues; -public abstract class MSortTemplate implements MSorter, IndexedSortable{ +public abstract class MSortTemplate implements MSorter, IndexedSortable { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class); private SelectionVector4 vector4; private SelectionVector4 aux; private long compares; private Queue runStarts = Queues.newLinkedBlockingQueue(); - private Queue newRunStarts; private FragmentContext context; /** @@ -67,7 +66,7 @@ public void setup(final FragmentContext context, final BufferAllocator allocator final int newBatch = this.vector4.get(i) >>> 16; if (newBatch == batch) { continue; - } else if(newBatch == batch + 1) { + } else if (newBatch == batch + 1) { runStarts.add(i); batch = newBatch; } else { @@ -135,7 +134,7 @@ public void sort(final VectorContainer container) { } int outIndex = 0; - newRunStarts = Queues.newLinkedBlockingQueue(); + final Queue newRunStarts = Queues.newLinkedBlockingQueue(); newRunStarts.add(outIndex); final int size = runStarts.size(); for (int i = 0; i < size / 2; i++) { @@ -155,9 +154,9 @@ public void sort(final VectorContainer container) { } final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount); aux.clear(); - aux = this.vector4.createNewWrapperCurrent(desiredRecordBatchCount); + aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount); vector4.clear(); - this.vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount); + vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount); tmp.clear(); runStarts = newRunStarts; } @@ -198,5 +197,4 @@ public void clear() { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing); public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java index 161ca6a26aa..e0d9c2d6af2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java @@ -25,15 +25,18 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorAccessible; -public interface PriorityQueueCopier { - public static long initialAllocation = 10000000; - public static long maxAllocation = 20000000; +public interface PriorityQueueCopier extends AutoCloseable { + public static final long INITIAL_ALLOCATION = 10000000; + public static final long MAX_ALLOCATION = 20000000; + + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, + List batchGroups, VectorAccessible outgoing) throws SchemaChangeException; - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List batchGroups, - VectorAccessible outgoing) throws SchemaChangeException; public int next(int targetRecordCount); - public void cleanup(); - public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class); + public final static TemplateClassDefinition TEMPLATE_DEFINITION = + new TemplateClassDefinition<>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class); + @Override + abstract public void close(); // specify this to leave out the Exception } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java index facf1929fc1..891907a16dd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java @@ -66,7 +66,7 @@ public int next(int targetRecordCount) { allocateVectors(targetRecordCount); for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) { if (queueSize == 0) { - cleanup(); + close(); return 0; } int compoundIndex = vector4.get(0); @@ -96,12 +96,12 @@ private void setValueCount(int count) { } @Override - public void cleanup() { + public void close() { vector4.clear(); - for (VectorWrapper w: outgoing) { + for (final VectorWrapper w: outgoing) { w.getValueVector().clear(); } - for (VectorWrapper w : hyperBatch) { + for (final VectorWrapper w : hyperBatch) { w.clear(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 873173915f9..d8f703ed485 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -32,7 +32,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class AbstractRecordBatch implements CloseableRecordBatch { - final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass()); protected final VectorContainer container; protected final T popConfig; @@ -51,8 +51,7 @@ protected AbstractRecordBatch(final T popConfig, final FragmentContext context, } protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, - final OperatorContext oContext) throws OutOfMemoryException { - super(); + final OperatorContext oContext) { this.context = context; this.popConfig = popConfig; this.oContext = oContext; @@ -119,6 +118,7 @@ public final IterOutcome next(final int inputIndex, final RecordBatch b){ return next; } + @Override public final IterOutcome next() { try { stats.startProcessing(); @@ -174,11 +174,11 @@ public void kill(final boolean sendUpstream) { protected abstract void killIncoming(boolean sendUpstream); - public void close(){ + @Override + public void close() { container.clear(); } - @Override public SelectionVector2 getSelectionVector2() { throw new UnsupportedOperationException(); @@ -199,7 +199,6 @@ public VectorWrapper getValueAccessorById(final Class clazz, final int... return container.getValueAccessorById(clazz, ids); } - @Override public WritableBatch getWritableBatch() { // logger.debug("Getting writable batch."); @@ -212,5 +211,4 @@ public WritableBatch getWritableBatch() { public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index dd90cab45ae..e84057bc939 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -25,7 +25,7 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack; public abstract class AbstractSingleRecordBatch extends AbstractRecordBatch { - final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass()); protected final RecordBatch incoming; protected boolean outOfMemory = false; @@ -51,7 +51,7 @@ public IterOutcome innerNext() { IterOutcome upstream = next(incoming); if (state != BatchState.FIRST && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) { do { - for (VectorWrapper w : incoming) { + for (final VectorWrapper w : incoming) { w.clear(); } } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0); @@ -118,9 +118,9 @@ public IterOutcome innerNext() { public BatchSchema getSchema() { if (container.hasSchema()) { return container.getSchema(); - } else { - return null; } + + return null; } protected abstract boolean setupNewSchema() throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java index f2f94509c17..732129a60b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java @@ -25,21 +25,19 @@ import org.apache.drill.exec.rpc.data.AckSender; public class RawFragmentBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class); + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class); private final FragmentRecordBatch header; private final DrillBuf body; private final AckSender sender; - - private AtomicBoolean ackSent = new AtomicBoolean(false); + private final AtomicBoolean ackSent = new AtomicBoolean(false); public RawFragmentBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) { - super(); this.header = header; - this.body = body; this.sender = sender; + this.body = body; if (body != null) { - body.retain(); + body.retain(1); } } @@ -58,11 +56,10 @@ public String toString() { public void release() { if (body != null) { - body.release(); + body.release(1); } } - public AckSender getSender() { return sender; } @@ -80,5 +77,4 @@ public long getByteCount() { public boolean isAckSent() { return ackSent.get(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 8e3b9e54833..55ae30987ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.drill.common.StackTrace; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; @@ -34,9 +35,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class RecordBatchLoader implements VectorAccessible, Iterable>{ private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class); @@ -63,14 +66,14 @@ public RecordBatchLoader(BufferAllocator allocator) { public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException { if (logger.isTraceEnabled()) { logger.trace("Loading record batch with def {} and data {}", def, buf); - logger.trace("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only")); + logger.trace("Load, ThreadID: {}\n{}", Thread.currentThread().getId(), new StackTrace()); } container.zeroVectors(); valueCount = def.getRecordCount(); boolean schemaChanged = schema == null; final Map oldFields = Maps.newHashMap(); - for (final VectorWrapper wrapper : container) { + for(final VectorWrapper wrapper : container) { final ValueVector vector = wrapper.getValueVector(); oldFields.put(vector.getField(), vector); } @@ -79,7 +82,7 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti try { final List fields = def.getFieldList(); int bufOffset = 0; - for (final SerializedField field : fields) { + for(final SerializedField field : fields) { final MaterializedField fieldDef = MaterializedField.create(field); ValueVector vector = oldFields.remove(fieldDef); @@ -106,7 +109,7 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti // rebuild the schema. final SchemaBuilder builder = BatchSchema.newBuilder(); - for (VectorWrapper v : newVectors) { + for (final VectorWrapper v : newVectors) { builder.addField(v.getField()); } builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); @@ -116,7 +119,7 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti } catch (final Throwable cause) { // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should // adjudicate to call upper layer specific clean up logic. - for (final VectorWrapper wrapper:newVectors) { + for (final VectorWrapper wrapper:newVectors) { wrapper.getValueVector().clear(); } throw cause; @@ -132,12 +135,11 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti return schemaChanged; } + @Override public TypedFieldId getValueVectorId(SchemaPath path) { return container.getValueVectorId(path); } - - // // @SuppressWarnings("unchecked") // public T getValueVectorId(int fieldId, Class clazz) { @@ -152,10 +154,12 @@ public TypedFieldId getValueVectorId(SchemaPath path) { // return (T) v; // } + @Override public int getRecordCount() { return valueCount; } + @Override public VectorWrapper getValueAccessorById(Class clazz, int... ids){ return container.getValueAccessorById(clazz, ids); } @@ -170,11 +174,12 @@ public Iterator> iterator() { return this.container.iterator(); } - public BatchSchema getSchema(){ + @Override + public BatchSchema getSchema() { return schema; } - public void clear(){ + public void clear() { container.clear(); } @@ -184,7 +189,7 @@ public void canonicalize() { // rebuild the schema. SchemaBuilder b = BatchSchema.newBuilder(); - for(VectorWrapper v : container){ + for(final VectorWrapper v : container){ b.addField(v.getField()); } b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java index 6e276280fee..8ca3ec83036 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -80,7 +80,7 @@ public boolean apply(SchemaPath path) { @Override public void allocate(Map vectorMap) throws OutOfMemoryException { - for (ValueVector v : vectorMap.values()) { + for (final ValueVector v : vectorMap.values()) { v.allocateNew(); } }