Skip to content

Commit

Permalink
[CARBONDATA-2554] Added support for logical type
Browse files Browse the repository at this point in the history
Added support for date and timestamp logical types in AvroCarbonWriter.

This closes #2347
  • Loading branch information
kunal642 authored and kumarvishal09 committed Jun 5, 2018
1 parent 27d7059 commit 2f23486
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface DirectDictionaryGenerator {
*/
Object getValueFromSurrogate(int key);

int generateKey(long value);

/**
* The method generate and returns the dictionary / surrogate key for direct dictionary column
* This Method is called while executing filter queries for getting direct surrogate members.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private int generateDirectSurrogateKeyForNonTimestampType(String memberStr) {
}
}

private int generateKey(long timeValue) {
public int generateKey(long timeValue) {
if (timeValue < MIN_VALUE || timeValue > MAX_VALUE) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Value for date type column is not in valid range. Value considered as null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private int generateDirectSurrogateKeyForNonTimestampType(String memberStr) {
}
}

private int generateKey(long timeValue) {
public int generateKey(long timeValue) {
long time = (timeValue - cutOffTimeStamp) / granularityFactor;
int keyValue = -1;
if (time >= (long) Integer.MIN_VALUE && time <= (long) Integer.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.carbondata.spark.testsuite.createTable

import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.io.{File, FileFilter, IOException}
import java.util
import java.util.concurrent.TimeUnit
Expand All @@ -42,6 +42,7 @@ import scala.concurrent.duration.Duration

import org.apache.avro
import org.apache.commons.lang.CharEncoding
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import tech.allegro.schema.json2avro.converter.JsonAvroConverter

import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
Expand Down Expand Up @@ -2151,4 +2152,146 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
writer.close()
}

test("test logical type date") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "com.apache.schema",
| "type": "record",
| "name": "StudentActivity",
| "fields": [
| {
| "name": "id",
| "type": {"type" : "int", "logicalType": "date"}
| },
| {
| "name": "course_details",
| "type": {
| "name": "course_details",
| "type": "record",
| "fields": [
| {
| "name": "course_struct_course_time",
| "type": {"type" : "int", "logicalType": "date"}
| }
| ]
| }
| }
| ]
|}""".stripMargin

val json1 =
"""{"id": 101, "course_details": { "course_struct_course_time":10}}""".stripMargin
val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val converter = new JsonAvroConverter
val record = converter
.convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType date, course_details struct<course_struct_course_time: date>) STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(java.sql.Date.valueOf("1970-04-12"), Row(java.sql.Date.valueOf("1970-01-11")))))
}

test("test logical type timestamp-millis") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "com.apache.schema",
| "type": "record",
| "name": "StudentActivity",
| "fields": [
| {
| "name": "id",
| "type": {"type" : "long", "logicalType": "timestamp-millis"}
| },
| {
| "name": "course_details",
| "type": {
| "name": "course_details",
| "type": "record",
| "fields": [
| {
| "name": "course_struct_course_time",
| "type": {"type" : "long", "logicalType": "timestamp-millis"}
| }
| ]
| }
| }
| ]
|}""".stripMargin

val json1 =
"""{"id": 172800000,"course_details": { "course_struct_course_time":172800000}}""".stripMargin

val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val converter = new JsonAvroConverter
val record = converter
.convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
}

test("test logical type-micros timestamp") {
sql("drop table if exists sdkOutputTable")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
val schema1 =
"""{
| "namespace": "com.apache.schema",
| "type": "record",
| "name": "StudentActivity",
| "fields": [
| {
| "name": "id",
| "type": {"type" : "long", "logicalType": "timestamp-micros"}
| },
| {
| "name": "course_details",
| "type": {
| "name": "course_details",
| "type": "record",
| "fields": [
| {
| "name": "course_struct_course_time",
| "type": {"type" : "long", "logicalType": "timestamp-micros"}
| }
| ]
| }
| }
| ]
|}""".stripMargin

val json1 =
"""{"id": 172800000000,"course_details": { "course_struct_course_time":172800000000}}""".stripMargin

val nn = new org.apache.avro.Schema.Parser().parse(schema1)
val converter = new JsonAvroConverter
val record = converter
.convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)

val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
writer.write(record)
writer.close()
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY
|'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ public int getSurrogateIndex() {
logHolder.setReason(message);
}
} else {
surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) {
surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input);
} else {
surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
}
if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
message = CarbonDataProcessorUtil
Expand Down Expand Up @@ -316,15 +320,36 @@ public int getSurrogateIndex() {
if (!this.carbonDimension.getUseActualData()) {
byte[] value = null;
if (isDirectDictionary) {
int surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
int surrogateKey;
// If the input is a long value then this means that logical type was provided by
// the user using AvroCarbonWriter. In this case directly generate surrogate key
// using dictionaryGenerator.
if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) {
surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input);
} else {
surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
}
if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
value = new byte[0];
} else {
value = ByteUtil.toBytes(surrogateKey);
}
} else {
value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
this.carbonDimension.getDataType(), dateFormat);
// If the input is a long value then this means that logical type was provided by
// the user using AvroCarbonWriter. In this case directly generate Bytes from value.
if (this.carbonDimension.getDataType().equals(DataTypes.DATE)
|| this.carbonDimension.getDataType().equals(DataTypes.TIMESTAMP)
&& input instanceof Long) {
if (dictionaryGenerator != null) {
value = ByteUtil.toBytes(((DirectDictionary) dictionaryGenerator)
.generateKey((long) input));
} else {
value = ByteUtil.toBytes(Long.parseLong(parsedValue));
}
} else {
value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
this.carbonDimension.getDataType(), dateFormat);
}
if (this.carbonDimension.getDataType() == DataTypes.STRING
&& value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
Expand All @@ -333,8 +358,15 @@ public int getSurrogateIndex() {
}
updateValueToByteStream(dataOutputStream, value);
} else {
Object value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
this.carbonDimension.getDataType(), dateFormat);
Object value;
if (dictionaryGenerator instanceof DirectDictionary
&& input instanceof Long) {
value = ByteUtil.toBytes(
((DirectDictionary) dictionaryGenerator).generateKey((long) input));
} else {
value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
this.carbonDimension.getDataType(), dateFormat);
}
if (this.carbonDimension.getDataType() == DataTypes.STRING
&& value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public Integer getKey(Object value) {
return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
}

public Integer generateKey(long value) {
return dictionaryGenerator.generateKey(value);
}

@Override
public Object getValue(Integer key) {
return dictionaryGenerator.getValueFromSurrogate(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
Expand Down Expand Up @@ -215,6 +217,10 @@ private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatc

private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;

private DirectDictionaryGenerator dateDictionaryGenerator;

private DirectDictionaryGenerator timestampDictionaryGenerator;

public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
Expand Down Expand Up @@ -313,7 +319,23 @@ private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFie
throw new CarbonDataLoadingException("Loading Exception", e);
}
} else {
newData[i] = data[orderOfData[i]];
DataType dataType = dataFields[i].getColumn().getDataType();
if (dataType == DataTypes.DATE && data[orderOfData[i]] instanceof Long) {
if (dateDictionaryGenerator == null) {
dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(dataType, dataFields[i].getDateFormat());
}
newData[i] = dateDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
} else if (dataType == DataTypes.TIMESTAMP && data[orderOfData[i]] instanceof Long) {
if (timestampDictionaryGenerator == null) {
timestampDictionaryGenerator =
DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(dataType, dataFields[i].getTimestampFormat());
}
newData[i] = timestampDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
} else {
newData[i] = data[orderOfData[i]];
}
}
}
}
Expand Down

0 comments on commit 2f23486

Please sign in to comment.