Skip to content

Commit

Permalink
[CARBONDATA-2964] Fix for unsupported float data type bug
Browse files Browse the repository at this point in the history
Problem:

If multiple pages are present for the blocklet then during comparator creation, float and byte check was not there therefore Unsupported data type exception was thrown.
Byte data was being read as Double.
Solution:

Add check for float
In AbstractScannedResultCollector Byte condition was not present due to which it was going to Double flow.

This closes #2753
  • Loading branch information
kunal642 authored and ravipesala committed Sep 25, 2018
1 parent 5443b22 commit 1d4d240
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 2 deletions.
Expand Up @@ -145,6 +145,8 @@ Object getMeasureData(ColumnPage dataChunk, int index, CarbonMeasure carbonMeasu
return dataChunk.getLong(index);
} else if (dataType == DataTypes.FLOAT) {
return dataChunk.getFloat(index);
} else if (dataType == DataTypes.BYTE) {
return dataChunk.getByte(index);
} else if (DataTypes.isDecimal(dataType)) {
BigDecimal bigDecimalMsrValue = dataChunk.getDecimal(index);
if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {
Expand Down
Expand Up @@ -464,7 +464,7 @@ public static DataChunk3 getMeasureDataChunk3(EncodedBlocklet encodedBlocklet, i
private static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
ByteBuffer firstBuffer = null;
ByteBuffer secondBuffer = null;
if (dataType == DataTypes.BOOLEAN) {
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
return first[0] - second[0];
} else if (dataType == DataTypes.DOUBLE) {
firstBuffer = ByteBuffer.allocate(8);
Expand All @@ -480,6 +480,20 @@ private static int compareMeasureData(byte[] first, byte[] second, DataType data
compare = -1;
}
return (int) compare;
} else if (dataType == DataTypes.FLOAT) {
firstBuffer = ByteBuffer.allocate(8);
firstBuffer.put(first);
secondBuffer = ByteBuffer.allocate(8);
secondBuffer.put(second);
firstBuffer.flip();
secondBuffer.flip();
double compare = firstBuffer.getFloat() - secondBuffer.getFloat();
if (compare > 0) {
compare = 1;
} else if (compare < 0) {
compare = -1;
}
return (int) compare;
} else if (dataType == DataTypes.LONG || dataType == DataTypes.INT
|| dataType == DataTypes.SHORT) {
firstBuffer = ByteBuffer.allocate(8);
Expand Down
2 changes: 1 addition & 1 deletion docs/ddl-of-carbondata.md
Expand Up @@ -127,7 +127,7 @@ CarbonData DDL statements are documented here,which includes:
This property is for users to specify which columns belong to the MDK(Multi-Dimensions-Key) index.
* If users don't specify "SORT_COLUMN" property, by default MDK index be built by using all dimension columns except complex data type column.
* If this property is specified but with empty argument, then the table will be loaded without sort.
* This supports only string, date, timestamp, short, int, long, and boolean data types.
* This supports only string, date, timestamp, short, int, long, byte and boolean data types.
Suggested use cases : Only build MDK index for required columns,it might help to improve the data loading performance.

```
Expand Down
2 changes: 2 additions & 0 deletions docs/sdk-guide.md
Expand Up @@ -191,6 +191,8 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp
| BIGINT | DataTypes.LONG |
| DOUBLE | DataTypes.DOUBLE |
| VARCHAR | DataTypes.STRING |
| FLOAT | DataTypes.FLOAT |
| BYTE | DataTypes.BYTE |
| DATE | DataTypes.DATE |
| TIMESTAMP | DataTypes.TIMESTAMP |
| STRING | DataTypes.STRING |
Expand Down
4 changes: 4 additions & 0 deletions docs/supported-data-types-in-carbondata.md
Expand Up @@ -25,6 +25,10 @@
* BIGINT
* DOUBLE
* DECIMAL
* FLOAT
* BYTE

**NOTE**: Float and Bytes are only supported for SDK and FileFormat.

* Date/Time Types
* TIMESTAMP
Expand Down
Expand Up @@ -1187,6 +1187,43 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
}
}

test("test byte and float for multiple pages") {
val path = new File(warehouse1+"/sdk1").getAbsolutePath
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
spark.sql("drop table if exists multi_page")
var fields: Array[Field] = new Array[Field](8)
// same column name, but name as boolean type
fields(0) = new Field("a", DataTypes.STRING)
fields(1) = new Field("b", DataTypes.FLOAT)
fields(2) = new Field("c", DataTypes.BYTE)

try {
val builder = CarbonWriter.builder()
val writer =
builder.outputPath(path)
.isTransactionalTable(false)
.uniqueIdentifier(System.nanoTime()).withBlockSize(2)
.buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration)

var i = 0
while (i < 33000) {
val array = Array[String](
String.valueOf(i),
s"$i.3200", "32")
writer.write(array)
i += 1
}
writer.close()
spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
s"'$path'")
assert(spark.sql("select * from multi_page").count() == 33000)
} catch {
case ex: Exception => throw new RuntimeException(ex)
} finally {
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
}
}

override protected def beforeAll(): Unit = {
drop
createParquetTable
Expand Down
Expand Up @@ -458,6 +458,8 @@ public void testReadingOfByteAndFloatWithCarbonReader() throws IOException {
for (int j = 0; j < 3; j++) {
actualRow[j].toString().equalsIgnoreCase(expectedRow[j]);
}
assert(actualRow[1] instanceof Byte);
assert(actualRow[2] instanceof Float);
}
carbonReader.close();
} catch (Exception e) {
Expand Down

0 comments on commit 1d4d240

Please sign in to comment.