Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) exte
}

object ArrayBasedMapData {
def apply(map: Map[Any, Any]): ArrayBasedMapData = {
val array = map.toArray
ArrayBasedMapData(array.map(_._1), array.map(_._2))
}

def apply(keys: Array[Any], values: Array[Any]): ArrayBasedMapData = {
new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ final class Decimal extends Ordered[Decimal] with Serializable {
if (precision < 19) {
return null // Requested precision is too low to represent this value
}
this.decimalVal = BigDecimal(unscaled)
this.decimalVal = BigDecimal(unscaled, scale)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bug that worths a separate JIRA ticket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.longVal = 0L
} else {
val p = POW_10(math.min(precision, MAX_LONG_DIGITS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.columnar

import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -61,6 +62,10 @@ private[sql] abstract class BasicColumnAccessor[JvmType](
protected def underlyingBuffer = buffer
}

private[sql] class NullColumnAccess(buffer: ByteBuffer)
extends BasicColumnAccessor[Any](buffer, NULL)
with NullableColumnAccessor

private[sql] abstract class NativeColumnAccessor[T <: AtomicType](
override protected val buffer: ByteBuffer,
override protected val columnType: NativeColumnType[T])
Expand Down Expand Up @@ -96,18 +101,31 @@ private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[Array[Byte]](buffer, BINARY)
with NullableColumnAccessor

private[sql] class FixedDecimalColumnAccessor(buffer: ByteBuffer, precision: Int, scale: Int)
extends NativeColumnAccessor(buffer, FIXED_DECIMAL(precision, scale))
private[sql] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
extends NativeColumnAccessor(buffer, COMPACT_DECIMAL(dataType))

private[sql] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe LargeDecimalColumnAccessor according to the renaming change?

extends BasicColumnAccessor[Decimal](buffer, LARGE_DECIMAL(dataType))
with NullableColumnAccessor

private[sql] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType)
extends BasicColumnAccessor[InternalRow](buffer, STRUCT(dataType))
with NullableColumnAccessor

private[sql] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType)
extends BasicColumnAccessor[ArrayData](buffer, ARRAY(dataType))
with NullableColumnAccessor

private[sql] class GenericColumnAccessor(buffer: ByteBuffer, dataType: DataType)
extends BasicColumnAccessor[Array[Byte]](buffer, GENERIC(dataType))
private[sql] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType)
extends BasicColumnAccessor[MapData](buffer, MAP(dataType))
with NullableColumnAccessor

private[sql] object ColumnAccessor {
def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
val buf = buffer.order(ByteOrder.nativeOrder)

dataType match {
case NullType => new NullColumnAccess(buf)
case BooleanType => new BooleanColumnAccessor(buf)
case ByteType => new ByteColumnAccessor(buf)
case ShortType => new ShortColumnAccessor(buf)
Expand All @@ -117,9 +135,15 @@ private[sql] object ColumnAccessor {
case DoubleType => new DoubleColumnAccessor(buf)
case StringType => new StringColumnAccessor(buf)
case BinaryType => new BinaryColumnAccessor(buf)
case DecimalType.Fixed(precision, scale) if precision < 19 =>
new FixedDecimalColumnAccessor(buf, precision, scale)
case other => new GenericColumnAccessor(buf, other)
case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
new CompactDecimalColumnAccessor(buf, dt)
case dt: DecimalType => new DecimalColumnAccessor(buf, dt)
case struct: StructType => new StructColumnAccessor(buf, struct)
case array: ArrayType => new ArrayColumnAccessor(buf, array)
case map: MapType => new MapColumnAccessor(buf, map)
case udt: UserDefinedType[_] => ColumnAccessor(udt.sqlType, buffer)
case other =>
throw new Exception(s"not support type: $other")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ private[sql] class BasicColumnBuilder[JvmType](
}
}

private[sql] class NullColumnBuilder
extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL)
with NullableColumnBuilder

private[sql] abstract class ComplexColumnBuilder[JvmType](
columnStats: ColumnStats,
columnType: ColumnType[JvmType])
Expand Down Expand Up @@ -109,16 +113,20 @@ private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringCol

private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)

private[sql] class FixedDecimalColumnBuilder(
precision: Int,
scale: Int)
extends NativeColumnBuilder(
new FixedDecimalColumnStats(precision, scale),
FIXED_DECIMAL(precision, scale))
private[sql] class CompactDecimalColumnBuilder(dataType: DecimalType)
extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType))

private[sql] class DecimalColumnBuilder(dataType: DecimalType)
extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType))

private[sql] class StructColumnBuilder(dataType: StructType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType))

private[sql] class ArrayColumnBuilder(dataType: ArrayType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType))

// TODO (lian) Add support for array, struct and map
private[sql] class GenericColumnBuilder(dataType: DataType)
extends ComplexColumnBuilder(new GenericColumnStats(dataType), GENERIC(dataType))
private[sql] class MapColumnBuilder(dataType: MapType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
Expand All @@ -145,6 +153,7 @@ private[sql] object ColumnBuilder {
columnName: String = "",
useCompression: Boolean = false): ColumnBuilder = {
val builder: ColumnBuilder = dataType match {
case NullType => new NullColumnBuilder
case BooleanType => new BooleanColumnBuilder
case ByteType => new ByteColumnBuilder
case ShortType => new ShortColumnBuilder
Expand All @@ -154,9 +163,16 @@ private[sql] object ColumnBuilder {
case DoubleType => new DoubleColumnBuilder
case StringType => new StringColumnBuilder
case BinaryType => new BinaryColumnBuilder
case DecimalType.Fixed(precision, scale) if precision < 19 =>
new FixedDecimalColumnBuilder(precision, scale)
case other => new GenericColumnBuilder(other)
case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
new CompactDecimalColumnBuilder(dt)
case dt: DecimalType => new DecimalColumnBuilder(dt)
case struct: StructType => new StructColumnBuilder(struct)
case array: ArrayType => new ArrayColumnBuilder(array)
case map: MapType => new MapColumnBuilder(map)
case udt: UserDefinedType[_] =>
return apply(udt.sqlType, initialSize, columnName, useCompression)
case other =>
throw new Exception(s"not suppported type: $other")
}

builder.initialize(initialSize, columnName, useCompression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ private[sql] class BinaryColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
}

private[sql] class FixedDecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
private[sql] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
def this(dt: DecimalType) = this(dt.precision, dt.scale)

protected var upper: Decimal = null
protected var lower: Decimal = null

Expand All @@ -245,16 +247,17 @@ private[sql] class FixedDecimalColumnStats(precision: Int, scale: Int) extends C
val value = row.getDecimal(ordinal, precision, scale)
if (upper == null || value.compareTo(upper) > 0) upper = value
if (lower == null || value.compareTo(lower) < 0) lower = value
sizeInBytes += FIXED_DECIMAL.defaultSize
// TODO: this is not right for DecimalType with precision > 18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ObjectColumnStats for large decimal?

sizeInBytes += 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For decimals with precision greater than 18, CatalystSchemaConverter.minBytesForPrecision can be useful here. At least it provides a relatively tight upper limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks. I thinking it's better to get the number of written bytes from buffer, no more guessing.

}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class GenericColumnStats(dataType: DataType) extends ColumnStats {
val columnType = GENERIC(dataType)
private[sql] class ObjectColumnStats(dataType: DataType) extends ColumnStats {
val columnType = ColumnType(dataType)

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
Expand Down
Loading