Skip to content

Commit

Permalink
[SPARK-44805][SQL] getBytes/getShorts/getInts/etc. should work in a c…
Browse files Browse the repository at this point in the history
…olumn vector that has a dictionary

Change getBytes/getShorts/getInts/getLongs/getFloats/getDoubles in `OnHeapColumnVector` and `OffHeapColumnVector` to use the dictionary, if present.

The following query gets incorrect results:
```
drop table if exists t1;

create table t1 using parquet as
select * from values
(named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2)))
as (value);

select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from t1;

{"f1":[1.0,2.0,3.0],"f2":[0,0,0]}

```
The result should be:
```
{"f1":[1.0,2.0,3.0],"f2":[1,2,3]}
```
The cast operation copies the second array by calling `ColumnarArray#copy`, which in turn calls `ColumnarArray#toIntArray`, which in turn calls `ColumnVector#getInts` on the underlying column vector (which is either an `OnHeapColumnVector` or an `OffHeapColumnVector`). The implementation of `getInts` in either concrete class assumes there is no dictionary and does not use it if it is present (in fact, it even asserts that there is no dictionary). However, in the above example, the column vector associated with the second array does have a dictionary:
```
java -cp ~/github/parquet-mr/parquet-tools/target/parquet-tools-1.10.1.jar org.apache.parquet.tools.Main meta ./spark-warehouse/t1/part-00000-122fdd53-8166-407b-aec5-08e0c2845c3d-c000.snappy.parquet
...
row group 1: RC:1 TS:112 OFFSET:4
-------------------------------------------------------------------------------------------------------------------------------------------------------
value:
.f1:
..list:
...element:   INT32 SNAPPY DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:RLE,PLAIN ST:[min: 1, max: 3, num_nulls: 0]
.f2:
..list:
...element:   INT32 SNAPPY DO:51 FPO:80 SZ:69/65/0.94 VC:3 ENC:RLE,PLAIN_DICTIONARY ST:[min: 1, max: 2, num_nulls: 0]

```
The same bug also occurs when field f2 is a map. This PR fixes that case as well.

No, except for fixing the correctness issue.

New tests.

No.

Closes #42850 from bersprockets/vector_oddity.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit fac236e)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
bersprockets authored and dongjoon-hyun committed Sep 8, 2023
1 parent a4d40e8 commit 8bb0546
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
public final class ColumnDictionary implements Dictionary {
private int[] intDictionary;
private long[] longDictionary;
private float[] floatDictionary;
private double[] doubleDictionary;

public ColumnDictionary(int[] dictionary) {
this.intDictionary = dictionary;
Expand All @@ -31,6 +33,14 @@ public ColumnDictionary(long[] dictionary) {
this.longDictionary = dictionary;
}

public ColumnDictionary(float[] dictionary) {
this.floatDictionary = dictionary;
}

public ColumnDictionary(double[] dictionary) {
this.doubleDictionary = dictionary;
}

@Override
public int decodeToInt(int id) {
return intDictionary[id];
Expand All @@ -42,14 +52,10 @@ public long decodeToLong(int id) {
}

@Override
public float decodeToFloat(int id) {
throw new UnsupportedOperationException("Dictionary encoding does not support float");
}
public float decodeToFloat(int id) { return floatDictionary[id]; }

@Override
public double decodeToDouble(int id) {
throw new UnsupportedOperationException("Dictionary encoding does not support double");
}
public double decodeToDouble(int id) { return doubleDictionary[id]; }

@Override
public byte[] decodeToBinary(int id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,14 @@ public byte getByte(int rowId) {

@Override
public byte[] getBytes(int rowId, int count) {
assert(dictionary == null);
byte[] array = new byte[count];
Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
if (dictionary == null) {
Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getByte(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -266,9 +271,14 @@ public short getShort(int rowId) {

@Override
public short[] getShorts(int rowId, int count) {
assert(dictionary == null);
short[] array = new short[count];
Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count * 2L);
if (dictionary == null) {
Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count * 2L);
} else {
for (int i = 0; i < count; i++) {
array[i] = getShort(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -327,9 +337,14 @@ public int getInt(int rowId) {

@Override
public int[] getInts(int rowId, int count) {
assert(dictionary == null);
int[] array = new int[count];
Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count * 4L);
if (dictionary == null) {
Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count * 4L);
} else {
for (int i = 0; i < count; i++) {
array[i] = getInt(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -399,9 +414,14 @@ public long getLong(int rowId) {

@Override
public long[] getLongs(int rowId, int count) {
assert(dictionary == null);
long[] array = new long[count];
Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count * 8L);
if (dictionary == null) {
Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count * 8L);
} else {
for (int i = 0; i < count; i++) {
array[i] = getLong(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -458,9 +478,14 @@ public float getFloat(int rowId) {

@Override
public float[] getFloats(int rowId, int count) {
assert(dictionary == null);
float[] array = new float[count];
Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count * 4L);
if (dictionary == null) {
Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count * 4L);
} else {
for (int i = 0; i < count; i++) {
array[i] = getFloat(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -518,9 +543,15 @@ public double getDouble(int rowId) {

@Override
public double[] getDoubles(int rowId, int count) {
assert(dictionary == null);
double[] array = new double[count];
Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8L);
if (dictionary == null) {
Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET,
count * 8L);
} else {
for (int i = 0; i < count; i++) {
array[i] = getDouble(rowId + i);
}
}
return array;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,14 @@ public byte getByte(int rowId) {

@Override
public byte[] getBytes(int rowId, int count) {
assert(dictionary == null);
byte[] array = new byte[count];
System.arraycopy(byteData, rowId, array, 0, count);
if (dictionary == null) {
System.arraycopy(byteData, rowId, array, 0, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getByte(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -263,9 +268,14 @@ public short getShort(int rowId) {

@Override
public short[] getShorts(int rowId, int count) {
assert(dictionary == null);
short[] array = new short[count];
System.arraycopy(shortData, rowId, array, 0, count);
if (dictionary == null) {
System.arraycopy(shortData, rowId, array, 0, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getShort(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -319,9 +329,14 @@ public int getInt(int rowId) {

@Override
public int[] getInts(int rowId, int count) {
assert(dictionary == null);
int[] array = new int[count];
System.arraycopy(intData, rowId, array, 0, count);
if (dictionary == null) {
System.arraycopy(intData, rowId, array, 0, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getInt(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -385,9 +400,14 @@ public long getLong(int rowId) {

@Override
public long[] getLongs(int rowId, int count) {
assert(dictionary == null);
long[] array = new long[count];
System.arraycopy(longData, rowId, array, 0, count);
if (dictionary == null) {
System.arraycopy(longData, rowId, array, 0, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getLong(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -437,9 +457,14 @@ public float getFloat(int rowId) {

@Override
public float[] getFloats(int rowId, int count) {
assert(dictionary == null);
float[] array = new float[count];
System.arraycopy(floatData, rowId, array, 0, count);
if (dictionary == null) {
System.arraycopy(floatData, rowId, array, 0, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getFloat(rowId + i);
}
}
return array;
}

Expand Down Expand Up @@ -491,9 +516,14 @@ public double getDouble(int rowId) {

@Override
public double[] getDoubles(int rowId, int count) {
assert(dictionary == null);
double[] array = new double[count];
System.arraycopy(doubleData, rowId, array, 0, count);
if (dictionary == null) {
System.arraycopy(doubleData, rowId, array, 0, count);
} else {
for (int i = 0; i < count; i++) {
array[i] = getDouble(rowId + i);
}
}
return array;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,16 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
checkAnswer(sql("select * from tbl"), expected)
}
}

test("SPARK-44805: cast of struct with two arrays") {
withTable("tbl") {
sql("create table tbl (value struct<f1:array<int>,f2:array<int>>) using parquet")
sql("insert into tbl values (named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2)))")
val df = sql("select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from tbl")
val expected = Row(Row(Array(1.0d, 2.0d, 3.0d), Array(1, 1, 2))) :: Nil
checkAnswer(df, expected)
}
}
}

class ParquetV1QuerySuite extends ParquetQuerySuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.columnar.ColumnAccessor
import org.apache.spark.sql.execution.columnar.{ColumnAccessor, ColumnDictionary}
import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
Expand Down Expand Up @@ -383,6 +383,84 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(testVector.getStruct(1).get(1, DoubleType) === 5.67)
}

testVectors("SPARK-44805: getInts with dictionary", 3, IntegerType) { testVector =>
val dict = new ColumnDictionary(Array[Int](7, 8, 9))
testVector.setDictionary(dict)
testVector.reserveDictionaryIds(3)
testVector.getDictionaryIds.putInt(0, 0)
testVector.getDictionaryIds.putInt(1, 1)
testVector.getDictionaryIds.putInt(2, 2)

assert(testVector.getInts(0, 3)(0) == 7)
assert(testVector.getInts(0, 3)(1) == 8)
assert(testVector.getInts(0, 3)(2) == 9)
}

testVectors("SPARK-44805: getShorts with dictionary", 3, ShortType) { testVector =>
val dict = new ColumnDictionary(Array[Int](7, 8, 9))
testVector.setDictionary(dict)
testVector.reserveDictionaryIds(3)
testVector.getDictionaryIds.putInt(0, 0)
testVector.getDictionaryIds.putInt(1, 1)
testVector.getDictionaryIds.putInt(2, 2)

assert(testVector.getShorts(0, 3)(0) == 7)
assert(testVector.getShorts(0, 3)(1) == 8)
assert(testVector.getShorts(0, 3)(2) == 9)
}

testVectors("SPARK-44805: getBytes with dictionary", 3, ByteType) { testVector =>
val dict = new ColumnDictionary(Array[Int](7, 8, 9))
testVector.setDictionary(dict)
testVector.reserveDictionaryIds(3)
testVector.getDictionaryIds.putInt(0, 0)
testVector.getDictionaryIds.putInt(1, 1)
testVector.getDictionaryIds.putInt(2, 2)

assert(testVector.getBytes(0, 3)(0) == 7)
assert(testVector.getBytes(0, 3)(1) == 8)
assert(testVector.getBytes(0, 3)(2) == 9)
}

testVectors("SPARK-44805: getLongs with dictionary", 3, LongType) { testVector =>
val dict = new ColumnDictionary(Array[Long](2147483648L, 2147483649L, 2147483650L))
testVector.setDictionary(dict)
testVector.reserveDictionaryIds(3)
testVector.getDictionaryIds.putInt(0, 0)
testVector.getDictionaryIds.putInt(1, 1)
testVector.getDictionaryIds.putInt(2, 2)

assert(testVector.getLongs(0, 3)(0) == 2147483648L)
assert(testVector.getLongs(0, 3)(1) == 2147483649L)
assert(testVector.getLongs(0, 3)(2) == 2147483650L)
}

testVectors("SPARK-44805: getFloats with dictionary", 3, FloatType) { testVector =>
val dict = new ColumnDictionary(Array[Float](0.1f, 0.2f, 0.3f))
testVector.setDictionary(dict)
testVector.reserveDictionaryIds(3)
testVector.getDictionaryIds.putInt(0, 0)
testVector.getDictionaryIds.putInt(1, 1)
testVector.getDictionaryIds.putInt(2, 2)

assert(testVector.getFloats(0, 3)(0) == 0.1f)
assert(testVector.getFloats(0, 3)(1) == 0.2f)
assert(testVector.getFloats(0, 3)(2) == 0.3f)
}

testVectors("SPARK-44805: getDoubles with dictionary", 3, DoubleType) { testVector =>
val dict = new ColumnDictionary(Array[Double](1342.17727d, 1342.17728d, 1342.17729d))
testVector.setDictionary(dict)
testVector.reserveDictionaryIds(3)
testVector.getDictionaryIds.putInt(0, 0)
testVector.getDictionaryIds.putInt(1, 1)
testVector.getDictionaryIds.putInt(2, 2)

assert(testVector.getDoubles(0, 3)(0) == 1342.17727d)
assert(testVector.getDoubles(0, 3)(1) == 1342.17728d)
assert(testVector.getDoubles(0, 3)(2) == 1342.17729d)
}

test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
withVector(new OffHeapColumnVector(8, arrayType)) { testVector =>
val data = testVector.arrayData()
Expand Down

0 comments on commit 8bb0546

Please sign in to comment.