Skip to content

Commit

Permalink
Merge b9aa271 into b588cb6
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed Sep 3, 2018
2 parents b588cb6 + b9aa271 commit 5dcc340
Show file tree
Hide file tree
Showing 26 changed files with 350 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -632,12 +632,14 @@ public final class CarbonCommonConstants {
public static final String BYTE_TYPE = "ByteType";
public static final String SHORT_TYPE = "ShortType";
public static final String DECIMAL_TYPE = "DecimalType";
public static final String BINARY_TYPE = "BinaryType";
public static final String STRING = "String";

public static final String INTEGER = "Integer";
public static final String SHORT = "Short";
public static final String NUMERIC = "Numeric";
public static final String TIMESTAMP = "Timestamp";
public static final String BINARY = "Binary";
public static final String ARRAY = "array";
public static final String STRUCT = "struct";
public static final String MAP = "map";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataT
instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
} else if (dataType == DataTypes.STRING
|| dataType == DataTypes.BYTE_ARRAY
|| dataType == DataTypes.VARCHAR) {
|| dataType == DataTypes.VARCHAR
|| dataType == DataTypes.BINARY) {
instance = new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
} else {
throw new RuntimeException("Unsupported data dataType: " + dataType);
Expand All @@ -224,7 +225,8 @@ public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataT
instance = newDecimalPage(columnSpec, new byte[pageSize][]);
} else if (dataType == DataTypes.STRING
|| dataType == DataTypes.BYTE_ARRAY
|| dataType == DataTypes.VARCHAR) {
|| dataType == DataTypes.VARCHAR
|| dataType == DataTypes.BINARY) {
instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
} else {
throw new RuntimeException("Unsupported data dataType: " + dataType);
Expand Down Expand Up @@ -321,6 +323,12 @@ private static ColumnPage newFixedByteArrayPage(TableSpec.ColumnSpec columnSpec,
return fixLengthByteArrayPage;
}

private static ColumnPage newBinaryPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedByteArray, int offset, int length) throws MemoryException {
return VarLengthColumnPageBase.newBinaryColumnPage(
columnSpec, lvEncodedByteArray, offset, length);
}

/**
* Set byte values to page
*/
Expand Down Expand Up @@ -402,6 +410,8 @@ public void putData(int rowId, Object value) {
|| dataType == DataTypes.VARCHAR) {
putBytes(rowId, (byte[]) value);
statsCollector.update((byte[]) value);
} else if (dataType == DataTypes.BINARY) {
putBytes(rowId, (byte[]) value);
} else {
throw new RuntimeException("unsupported data type: " + dataType);
}
Expand Down Expand Up @@ -707,6 +717,8 @@ public byte[] compress(Compressor compressor) throws MemoryException, IOExceptio
|| columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE
|| columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
return compressor.compressByte(getComplexParentFlattenedBytePage());
} else if (dataType == DataTypes.BINARY) {
return getLVFlattenedBytePage();
} else if (dataType == DataTypes.BYTE_ARRAY) {
return compressor.compressByte(getLVFlattenedBytePage());
} else {
Expand Down Expand Up @@ -771,6 +783,8 @@ public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compresse
} else if (storeDataType == DataTypes.BYTE_ARRAY) {
byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
} else if (storeDataType == DataTypes.BINARY) {
return newBinaryPage(columnSpec, compressedData, offset, length);
} else {
throw new UnsupportedOperationException(
"unsupport uncompress column page: " + meta.getStoreDataType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;

/**
* This extension uses unsafe memory to store page data, for variable length data type (string)
Expand All @@ -36,7 +37,11 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
throws MemoryException {
super(columnSpec, dataType, pageSize);
capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
if (dataType == DataTypes.BINARY) {
capacity = (int) (pageSize * DEFAULT_BINARY_SIZE * FACTOR);
} else {
capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
}
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.carbondata.core.datastore.page;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
Expand All @@ -41,6 +43,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
static final int longBits = DataTypes.LONG.getSizeBits();
// default size for each row, grows as needed
static final int DEFAULT_ROW_SIZE = 8;
static final int DEFAULT_BINARY_SIZE = 512;

static final double FACTOR = 1.25;

Expand Down Expand Up @@ -139,6 +142,12 @@ static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] l
return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
}

static ColumnPage newBinaryColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
int offset, int length)
throws MemoryException {
return getBinaryColumnPage(columnSpec, lvEncodedBytes, DataTypes.BINARY, offset, length);
}

/**
* Create a new column page based on the LV (Length Value) encoded bytes
*/
Expand Down Expand Up @@ -267,6 +276,50 @@ private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSp
return page;
}

private static ColumnPage getBinaryColumnPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedBytes, DataType dataType, int start, int len) throws MemoryException {
// extract length and data, set them to rowOffset and unsafe memory correspondingly
int rowId = 0;

ColumnPage rowOffset = ColumnPage.newPage(columnSpec, DataTypes.BINARY,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);

List<Integer> rowLength = new ArrayList<>();
int length;
int offset;
int lvEncodedOffset = start;
int counter = 0;
// extract Length field in input and calculate total length
for (offset = 0; lvEncodedOffset < len; offset += length) {
length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
rowOffset.putInt(counter, offset);
rowLength.add(length);
lvEncodedOffset += 4 + length;
rowId++;
counter++;
}
rowOffset.putInt(counter, offset);
int numRows = rowId;
VarLengthColumnPageBase page;
if (unsafe) {
page = new UnsafeVarLengthColumnPage(columnSpec, dataType, numRows);
} else {
page = new SafeVarLengthColumnPage(columnSpec, dataType, numRows);
}
// set total length and rowOffset in page
page.totalLength = offset;
page.rowOffset = rowOffset;

// set data in page
lvEncodedOffset = start;
for (int i = 0; i < numRows; i++) {
length = rowLength.get(i);
page.putBytes(i, lvEncodedBytes, lvEncodedOffset + 4, length);
lvEncodedOffset += 4 + length;
}
return page;
}

@Override
public void putByte(int rowId, byte value) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void writeMinMax(DataOutput out) throws IOException {
out.writeInt(-1);
out.writeInt(-1);
}
} else if (dataType == DataTypes.BYTE_ARRAY) {
} else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY) {
// for complex type, it will come here, ignoring stats for complex type
// TODO: support stats for complex type
} else {
Expand Down Expand Up @@ -189,7 +189,7 @@ private void readMinMax(DataInput in) throws IOException {
in.readInt();
// precision field is obsoleted. It is stored in the schema data type in columnSpec
in.readInt();
} else if (dataType == DataTypes.BYTE_ARRAY) {
} else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY) {
// for complex type, it will come here, ignoring stats for complex type
// TODO: support stats for complex type
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpe
}

private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
if (columnPage.getDataType() == DataTypes.BINARY) {
return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
}
SimpleStatsResult stats = columnPage.getStatistics();
DataType dataType = stats.getDataType();
if (dataType == DataTypes.BOOLEAN) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ private org.apache.carbondata.format.DataType fromWrapperToExternalDataType(Data
return org.apache.carbondata.format.DataType.DATE;
} else if (dataType.getId() == DataTypes.TIMESTAMP.getId()) {
return org.apache.carbondata.format.DataType.TIMESTAMP;
} else if (dataType.getId() == DataTypes.BINARY.getId()) {
return org.apache.carbondata.format.DataType.BINARY;
} else if (DataTypes.isArrayType(dataType)) {
return org.apache.carbondata.format.DataType.ARRAY;
} else if (DataTypes.isStructType(dataType)) {
Expand Down Expand Up @@ -494,6 +496,8 @@ private DataType fromExternalToWrapperDataType(org.apache.carbondata.format.Data
return DataTypes.TIMESTAMP;
case DATE:
return DataTypes.DATE;
case BINARY:
return DataTypes.BINARY;
case ARRAY:
return DataTypes.createDefaultArrayType();
case STRUCT:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.metadata.datatype;
public class BinaryType extends DataType {
static final DataType BINARY =
new BinaryType(DataTypes.BINARY_TYPE_ID, 13, "BINARY", -1);
private BinaryType(int id, int precedenceOrder, String name, int sizeInBytes) {
super(id, precedenceOrder, name, sizeInBytes);
}
// this function is needed to ensure singleton pattern while supporting java serialization
private Object readResolve() {
return DataTypes.BINARY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public static char convertType(DataType dataType) {
return DATE_CHAR;
} else if (dataType == DataTypes.BYTE_ARRAY) {
return BYTE_ARRAY_CHAR;
} else if (dataType == DataTypes.BINARY) {
return BYTE_ARRAY_CHAR;
} else {
throw new RuntimeException("Unexpected type: " + dataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DataTypes {
public static final DataType DOUBLE = DoubleType.DOUBLE;
public static final DataType NULL = NullType.NULL;
public static final DataType BYTE = ByteType.BYTE;
public static final DataType BINARY = BinaryType.BINARY;

// internal use only, for variable length data type
public static final DataType BYTE_ARRAY = ByteArrayType.BYTE_ARRAY;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class DataTypes {
public static final int STRUCT_TYPE_ID = 12;
public static final int MAP_TYPE_ID = 13;
public static final int VARCHAR_TYPE_ID = 18;
public static final int BINARY_TYPE_ID = 19;

/**
* create a DataType instance from uniqueId of the DataType
Expand Down Expand Up @@ -102,6 +104,8 @@ public static DataType valueOf(int id) {
return NULL;
} else if (id == DECIMAL_TYPE_ID) {
return createDefaultDecimalType();
} else if (id == BINARY.getId()) {
return BINARY;
} else if (id == ARRAY_TYPE_ID) {
return createDefaultArrayType();
} else if (id == STRUCT_TYPE_ID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,58 @@ public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
}
}

public static class BinaryVectorFiller implements MeasureVectorFiller {
@Override
public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
int vectorOffset = info.vectorOffset;
CarbonColumnVector vector = info.vector;
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
vector.putBytes(vectorOffset, dataChunk.getBytes(i));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
if (nullBitSet.get(i)) {
vector.putNull(vectorOffset);
} else {
vector.putBytes(vectorOffset, dataChunk.getBytes(i));
}
vectorOffset++;
}
}
}
@Override
public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
int vectorOffset = info.vectorOffset;
CarbonColumnVector vector = info.vector;
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
int currentRow = filteredRowId[i];
vector.putBytes(vectorOffset, dataChunk.getBytes(currentRow));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
vector.putBytes(vectorOffset, dataChunk.getBytes(currentRow));
}
vectorOffset++;
}
}
}
}

public static class DefaultMeasureVectorFiller implements MeasureVectorFiller {

@Override
Expand Down Expand Up @@ -370,6 +422,8 @@ public static MeasureVectorFiller getMeasureVectorFiller(DataType dataType) {
return new LongMeasureVectorFiller();
} else if (DataTypes.isDecimal(dataType)) {
return new DecimalMeasureVectorFiller();
} else if (dataType == DataTypes.BINARY) {
return new BinaryVectorFiller();
} else {
return new DefaultMeasureVectorFiller();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ private static int compareMeasureData(byte[] first, byte[] second, DataType data
compare = -1;
}
return (int) compare;
} else if (dataType == DataTypes.BINARY) {
return 0;
} else if (DataTypes.isDecimal(dataType)) {
return DataTypeUtil.byteToBigDecimal(first).compareTo(DataTypeUtil.byteToBigDecimal(second));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,7 @@ public static byte[] getValueAsBytes(DataType dataType, Object value) {
return b.array();
} else if (DataTypes.isDecimal(dataType)) {
return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
} else if (dataType == DataTypes.BYTE_ARRAY) {
} else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY) {
return (byte[]) value;
} else if (dataType == DataTypes.STRING
|| dataType == DataTypes.DATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ public static DataType valueOf(DataType dataType, int precision, int scale) {
return DataTypes.BYTE_ARRAY;
} else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(dataType.getName())) {
return DataTypes.BYTE_ARRAY;
} else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType.getName())) {
return DataTypes.BINARY;
} else if (dataType.getName().equalsIgnoreCase("decimal")) {
return DataTypes.createDecimalType(precision, scale);
} else if (dataType.getName().equalsIgnoreCase("array")) {
Expand Down
1 change: 1 addition & 0 deletions format/src/main/thrift/schema.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum DataType {
TIMESTAMP = 6,
DATE = 7,
BOOLEAN = 8,
BINARY = 19,
ARRAY = 20,
STRUCT = 21,
VARCHAR = 22,
Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit 5dcc340

Please sign in to comment.