Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Oct 19, 2015
1 parent 4511781 commit f9151cc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.unsafe.types.UTF8String

/**
* A help class for fast reading Int/Long/Float/Double from ByteBuffer in native order.
*
* WARNNING: This only works with HeapByteBuffer
*/
object ByteBufferHelper {
def getInt(buffer: ByteBuffer): Int = {
Expand Down Expand Up @@ -360,7 +362,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 8) {
}

override def extract(buffer: ByteBuffer): UTF8String = {
val length = ByteBufferHelper.getInt(buffer)
val length = buffer.getInt()
assert(buffer.hasArray)
val base = buffer.array()
val offset = buffer.arrayOffset()
Expand Down Expand Up @@ -426,7 +428,7 @@ private[sql] sealed abstract class ByteArrayColumnType[JvmType](val defaultSize:
}

override def extract(buffer: ByteBuffer): JvmType = {
val length = ByteBufferHelper.getInt(buffer)
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
deserialize(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera

protected def create(columnTypes: Seq[DataType]): ColumnarIterator = {
val ctx = newCodeGenContext()
val (creaters, accesses) = columnTypes.zipWithIndex.map { case (dt, index) =>
val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) =>
val accessorName = ctx.freshName("accessor")
val accessorCls = dt match {
case NullType => classOf[NullColumnAccessor].getName
Expand Down Expand Up @@ -92,7 +92,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
private byte[][] buffers = null;

private int currentRow = 0;
private int totalRows = 0;
private int numRowsInBatch = 0;

private scala.collection.Iterator input = null;
private MutableRow mutableRow = null;
Expand All @@ -117,7 +117,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
}

public boolean hasNext() {
if (currentRow < totalRows) {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
Expand All @@ -126,17 +126,17 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera

${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next();
currentRow = 0;
totalRows = batch.count();
for (int i=0; i<columnIndexes.length; i++) {
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
${creaters.mkString("\n")}
${initializeAccessors.mkString("\n")}

return hasNext();
}

public InternalRow next() {
${accesses.mkString("\n")}
${extractors.mkString("\n")}
currentRow += 1;
return mutableRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ private[sql] object InMemoryRelation {
tableName)()
}

private[sql] case class CachedBatch(count: Int, buffers: Array[Array[Byte]], stats: InternalRow)
/**
* CachedBatch is a cached batch of rows.
*
* @param numRows The total number of rows in this batch
* @param buffers The buffers for serialized columns
* @param stats The stat of columns
*/
private[sql] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)

private[sql] case class InMemoryRelation(
output: Seq[Attribute],
Expand Down

0 comments on commit f9151cc

Please sign in to comment.