Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-5351: Minimize bounds checking in var len vectors for Parquet #781

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions exec/vector/src/main/codegen/templates/NullableValueVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,9 @@ public void setSafe(int index, byte[] value, int start, int length) {
<#if type.major != "VarLen">
throw new UnsupportedOperationException();
<#else>
fillEmpties(index);
if (index > lastSet + 1) {
fillEmpties(index);
}

bits.getMutator().setSafe(index, 1);
values.getMutator().setSafe(index, value, start, length);
Expand All @@ -522,7 +524,9 @@ public void setSafe(int index, ByteBuffer value, int start, int length) {
<#if type.major != "VarLen">
throw new UnsupportedOperationException();
<#else>
fillEmpties(index);
if (index > lastSet + 1) {
fillEmpties(index);
}

bits.getMutator().setSafe(index, 1);
values.getMutator().setSafe(index, value, start, length);
Expand Down Expand Up @@ -587,7 +591,9 @@ public void set(int index, int isSet<#list fields as field><#if field.include!tr

public void setSafe(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
<#if type.major == "VarLen">
fillEmpties(index);
if (index > lastSet + 1) {
fillEmpties(index);
}
</#if>

bits.getMutator().setSafe(index, isSet);
Expand All @@ -600,7 +606,9 @@ public void setSafe(int index, int isSet<#list fields as field><#if field.includ
public void setSafe(int index, Nullable${minor.class}Holder value) {

<#if type.major == "VarLen">
fillEmpties(index);
if (index > lastSet + 1) {
fillEmpties(index);
}
</#if>
bits.getMutator().setSafe(index, value.isSet);
values.getMutator().setSafe(index, value);
Expand All @@ -611,7 +619,9 @@ public void setSafe(int index, Nullable${minor.class}Holder value) {
public void setSafe(int index, ${minor.class}Holder value) {

<#if type.major == "VarLen">
fillEmpties(index);
if (index > lastSet + 1) {
fillEmpties(index);
}
</#if>
bits.getMutator().setSafe(index, 1);
values.getMutator().setSafe(index, value);
Expand All @@ -622,7 +632,9 @@ public void setSafe(int index, ${minor.class}Holder value) {
<#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "Interval" || minor.class == "IntervalDay")>
public void setSafe(int index, ${minor.javaType!type.javaType} value) {
<#if type.major == "VarLen">
fillEmpties(index);
if (index > lastSet + 1) {
fillEmpties(index);
}
</#if>
bits.getMutator().setSafe(index, 1);
values.getMutator().setSafe(index, value);
Expand Down
72 changes: 47 additions & 25 deletions exec/vector/src/main/codegen/templates/VariableLengthVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,15 @@ public void setSafe(int index, byte[] bytes) {
assert index >= 0;

final int currentOffset = offsetVector.getAccessor().get(index);
while (data.capacity() < currentOffset + bytes.length) {
reAlloc();
}
offsetVector.getMutator().setSafe(index + 1, currentOffset + bytes.length);
data.setBytes(currentOffset, bytes, 0, bytes.length);
try {
data.setBytes(currentOffset, bytes, 0, bytes.length);
} catch (IndexOutOfBoundsException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is a clever way to avoid checks, it will lead to difficulty when debugging Drill. Intentionally throwing a common exception makes it even harder to find cases where the exception indicates an error.

Let's take a step back. One of the things we need to change in Parquet is to avoid "low density batches" vectors that have very little data. Turns out one reason is tied up with the assumption that the code makes that it can tell when it has reached the end of a vector. (There are many bugs, but that is the key idea.)

Vectors don't have that ability today, so the code never worked.

What if we solve that problem, and yours, by changing how the DrillBuf works:

if ( ! data.setBytesIfCapacity(...)) {
  reAlloc();
  data.setBytes(...)
}

The above avoids the spurious exception and provides the means to manage variable-length vectors in Parquet.

Note that the bounds check is still done, but only inside Drillbuf. And, of course, that same check is done with the PR code: that check is what raises the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for a DrillBuf exceeding bounds is already being done once in Netty (which also throws the Exception). What we want to do is to avoid having to do more than one bounds check for every vector write operation. By adding the suggested check, we simply move the check from every vector setSafe method to every Drillbuf write. This would possibly impact performance in other parts of the code.
I particularly changed only var len vectors to address a hotspot in the Parquet reader performance, because as you are suggesting, the right way to fix is to address the low density batch problem at the same time as the vector overflow problem. Perhaps a longer discussion may be required here.

Copy link
Contributor

@paul-rogers paul-rogers Mar 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Netty/Drillbuf code is complex, so this does boil down to details... Yes, I agree that Netty does the bounds checks -- if we call Netty code. Consider this code in DrillBuf:

  @Override
  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
    udle.setBytes(index + offset, src, srcIndex, length);
    return this;
  }

  public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
    if (src.isDirect()) {
      checkIndex(index, length);
      PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
          length);
  ...

We have one version that delegates to the "udle" which calls another buf which calls PooledUnsafeDirectByteBuf which does a bounds check.

But, we have another method which cuts out the middleman and just does a direct memory copy. Given that, we could certainly add a version that does a bounds check and copy from heap into direct memory. Just use this Netty method:

class  PlatformDependent ...
    public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) ...

So, something like this:

class Drillbuf ...
  public boolean setIfCapacity(int offset, byte data[], int len) {
    if (offset + len >= capacity()) { return false; }
    PlatformDependent.copyMemory(data, 0,
      PlatformDependent.directBufferAddress(src) + offset,
      len);
   return true;
}

Of course, this assumes an implementation of the underlying direct memory, but, as we saw, we are already doing something similar.

Would this work and perform as well as the exception-based approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, if the suggested change blows up the scope of this fix, then the original proposal is fine; we can always adjust it later if needed when we solve the low-density batch problem.

while (data.capacity() < currentOffset + bytes.length) {
reAlloc();
}
data.setBytes(currentOffset, bytes, 0, bytes.length);
}
}

/**
Expand All @@ -528,24 +532,31 @@ public void setSafe(int index, ByteBuffer bytes, int start, int length) {
assert index >= 0;

int currentOffset = offsetVector.getAccessor().get(index);

while (data.capacity() < currentOffset + length) {
reAlloc();
}
offsetVector.getMutator().setSafe(index + 1, currentOffset + length);
data.setBytes(currentOffset, bytes, start, length);
try {
data.setBytes(currentOffset, bytes, start, length);
} catch (IndexOutOfBoundsException e) {
while (data.capacity() < currentOffset + length) {
reAlloc();
}
data.setBytes(currentOffset, bytes, start, length);
}
}

public void setSafe(int index, byte[] bytes, int start, int length) {
assert index >= 0;

final int currentOffset = offsetVector.getAccessor().get(index);

while (data.capacity() < currentOffset + length) {
reAlloc();
}
offsetVector.getMutator().setSafe(index + 1, currentOffset + length);
data.setBytes(currentOffset, bytes, start, length);
try {
data.setBytes(currentOffset, bytes, start, length);
} catch (IndexOutOfBoundsException e) {
while (data.capacity() < currentOffset + length) {
reAlloc();
}
data.setBytes(currentOffset, bytes, start, length);
}
}

@Override
Expand All @@ -562,12 +573,16 @@ public void setSafe(int index, int start, int end, DrillBuf buffer){
final int len = end - start;
final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});

while(data.capacity() < outputStart + len) {
reAlloc();
offsetVector.getMutator().setSafe( index+1, outputStart + len);
try{
buffer.getBytes(start, data, outputStart, len);
} catch (IndexOutOfBoundsException e) {
while (data.capacity() < outputStart + len) {
reAlloc();
}
buffer.getBytes(start, data, outputStart, len);
}

offsetVector.getMutator().setSafe( index+1, outputStart + len);
buffer.getBytes(start, data, outputStart, len);
}

public void setSafe(int index, Nullable${minor.class}Holder holder){
Expand All @@ -579,11 +594,14 @@ public void setSafe(int index, Nullable${minor.class}Holder holder){

int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});

while(data.capacity() < outputStart + len) {
reAlloc();
try {
holder.buffer.getBytes(start, data, outputStart, len);
} catch (IndexOutOfBoundsException e) {
while (data.capacity() < outputStart + len) {
reAlloc();
}
holder.buffer.getBytes(start, data, outputStart, len);
}

holder.buffer.getBytes(start, data, outputStart, len);
offsetVector.getMutator().setSafe( index+1, outputStart + len);
}

Expand All @@ -593,11 +611,15 @@ public void setSafe(int index, ${minor.class}Holder holder){
final int len = end - start;
final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});

while(data.capacity() < outputStart + len) {
reAlloc();
}

holder.buffer.getBytes(start, data, outputStart, len);
try {
holder.buffer.getBytes(start, data, outputStart, len);
} catch (IndexOutOfBoundsException e) {
while(data.capacity() < outputStart + len) {
reAlloc();
}
holder.buffer.getBytes(start, data, outputStart, len);
}
offsetVector.getMutator().setSafe( index+1, outputStart + len);
}

Expand Down