Skip to content

Commit

Permalink
Merge 3e3db52 into cd0ce41
Browse files Browse the repository at this point in the history
  • Loading branch information
kumarvishal09 committed Nov 14, 2018
2 parents cd0ce41 + 3e3db52 commit 83d8215
Show file tree
Hide file tree
Showing 76 changed files with 1,243 additions and 715 deletions.
Expand Up @@ -1946,6 +1946,12 @@ private CarbonCommonConstants() {
*/
public static final String CARBON_WRITTEN_BY_APPNAME = "carbon.writtenby.app.name";

/**
* When more global dictionary columns are there then there is issue in generating codegen to them
* and it slows down the query.So we limit to 40 for now
*/
public static final int CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT = 40;

//////////////////////////////////////////////////////////////////////////////////////////
// Unused constants and parameters start here
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datastore;

public class ReusableDataBuffer {

private byte[] dataBuffer;

private int size;

public byte[] getDataBuffer(int requestedSize) {
if (dataBuffer == null || requestedSize > size) {
this.size = requestedSize + ((requestedSize * 30) / 100);
dataBuffer = new byte[size];
}
return dataBuffer;
}
}
Expand Up @@ -23,6 +23,7 @@

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
Expand Down Expand Up @@ -72,7 +73,7 @@ public DimensionColumnPage[] decodeAllColumnPages() {
for (int i = 0; i < pagesCount; i++) {
try {
if (dataChunks[i] == null) {
dataChunks[i] = chunkReader.decodeColumnPage(this, i);
dataChunks[i] = chunkReader.decodeColumnPage(this, i, null);
}
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
Expand All @@ -93,7 +94,7 @@ public DimensionColumnPage decodeColumnPage(int pageNumber) {
}
if (dataChunks[pageNumber] == null) {
try {
dataChunks[pageNumber] = chunkReader.decodeColumnPage(this, pageNumber);
dataChunks[pageNumber] = chunkReader.decodeColumnPage(this, pageNumber, null);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
Expand All @@ -108,15 +109,16 @@ public DimensionColumnPage decodeColumnPage(int pageNumber) {
* @param index
* @return
*/
public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index) {
public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index,
ReusableDataBuffer reusableDataBuffer) {
assert index < pagesCount;
// in case of filter query filter column if filter column is decoded and stored.
// then return the same
if (dataChunks != null && null != dataChunks[index]) {
return dataChunks[index];
}
try {
return chunkReader.decodeColumnPage(this, index);
return chunkReader.decodeColumnPage(this, index, reusableDataBuffer);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -129,10 +131,11 @@ public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index) {
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo,
ReusableDataBuffer reusableDataBuffer) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo, reusableDataBuffer);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -191,7 +194,7 @@ public static CarbonDictionary getDictionary(LocalDictionaryChunk localDictionar
ColumnPageDecoder decoder = DefaultEncodingFactory.getInstance().createDecoder(
encodings, encoderMetas, compressor.getName());
ColumnPage decode = decoder.decode(localDictionaryChunk.getDictionary_data(), 0,
localDictionaryChunk.getDictionary_data().length);
localDictionaryChunk.getDictionary_data().length, null);
BitSet usedDictionary = BitSet.valueOf(compressor.unCompressByte(
localDictionaryChunk.getDictionary_values()));
int length = usedDictionary.length();
Expand Down
Expand Up @@ -39,14 +39,14 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
* @param columnValueSize size of each column value
*/
public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, int columnValueSize) {
int[] invertedIndexReverse, int numberOfRows, int columnValueSize, int dataLength) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = isExplicitSorted ?
dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataChunk.length;
dataLength + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataLength;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null, false);
DimensionStoreType.FIXED_LENGTH, null, false, dataLength);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
}

Expand All @@ -62,14 +62,14 @@ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
*/
public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
ColumnVectorInfo vectorInfo) {
ColumnVectorInfo vectorInfo, int dataLength) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = isExplicitSorted ?
dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataChunk.length;
dataLength + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataLength;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null, dataLength);
if (vectorInfo == null) {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
} else {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
Expand Down Expand Up @@ -56,7 +57,7 @@ public ColumnPage[] decodeAllColumnPages() {
for (int i = 0; i < pagesCount; i++) {
try {
if (columnPages[i] == null) {
columnPages[i] = chunkReader.decodeColumnPage(this, i);
columnPages[i] = chunkReader.decodeColumnPage(this, i, null);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -77,7 +78,7 @@ public ColumnPage decodeColumnPage(int pageNumber) {

try {
if (columnPages[pageNumber] == null) {
columnPages[pageNumber] = chunkReader.decodeColumnPage(this, pageNumber);
columnPages[pageNumber] = chunkReader.decodeColumnPage(this, pageNumber, null);
}
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
Expand All @@ -92,15 +93,16 @@ public ColumnPage decodeColumnPage(int pageNumber) {
* @param index
* @return
*/
public ColumnPage convertToColumnPageWithOutCache(int index) {
public ColumnPage convertToColumnPageWithOutCache(int index,
ReusableDataBuffer reusableDataBuffer) {
assert index < pagesCount;
// in case of filter query filter columns blocklet pages will uncompressed
// so no need to decode again
if (null != columnPages && columnPages[index] != null) {
return columnPages[index];
}
try {
return chunkReader.decodeColumnPage(this, index);
return chunkReader.decodeColumnPage(this, index, reusableDataBuffer);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
Expand All @@ -113,10 +115,11 @@ public ColumnPage convertToColumnPageWithOutCache(int index) {
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo,
ReusableDataBuffer reusableDataBuffer) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo, reusableDataBuffer);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -38,9 +38,9 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary) {
CarbonDictionary dictionary, int dataLength) {
this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
null);
null, dataLength);
}

/**
Expand All @@ -54,28 +54,28 @@ public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
CarbonDictionary dictionary, ColumnVectorInfo vectorInfo, int dataLength) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = 0;
switch (dimStoreType) {
case LOCAL_DICT:
totalSize = null != invertedIndex ?
(dataChunks.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) :
dataChunks.length;
(dataLength + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) :
dataLength;
break;
case VARIABLE_INT_LENGTH:
case VARIABLE_SHORT_LENGTH:
totalSize = null != invertedIndex ?
(dataChunks.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) + (
(dataLength + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) + (
numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) :
(dataChunks.length + (numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE));
(dataLength + (numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE));
break;
default:
throw new UnsupportedOperationException("Invalidate dimension store type");
}
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
dictionary, vectorInfo != null);
dictionary, vectorInfo != null, dataLength);
if (vectorInfo != null) {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
} else {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.memory.MemoryException;
Expand Down Expand Up @@ -60,11 +61,12 @@ DimensionRawColumnChunk readRawDimensionChunk(FileReader fileReader, int columnI
* @throws IOException
*/
DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber) throws IOException, MemoryException;
int pageNumber, ReusableDataBuffer reusableDataBuffer) throws IOException, MemoryException;

/**
* Decodes the raw data chunk of given page number and fill the vector with decoded data.
*/
void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
int pageNumber, ColumnVectorInfo vectorInfo, ReusableDataBuffer reusableDataBuffer)
throws IOException, MemoryException;
}
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
Expand Down Expand Up @@ -56,13 +57,14 @@ MeasureRawColumnChunk readRawMeasureChunk(FileReader fileReader, int columnIndex
* @return
* @throws IOException
*/
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber) throws IOException, MemoryException;
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk, int pageNumber,
ReusableDataBuffer reusableDataBuffer) throws IOException, MemoryException;

/**
* Decode raw data and fill the vector
*/
void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk, int pageNumber,
ColumnVectorInfo vectorInfo, ReusableDataBuffer reusableDataBuffer)
throws IOException, MemoryException;

}
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
Expand Down Expand Up @@ -86,7 +87,8 @@ public AbstractChunkReader(final int[] eachColumnValueSize, final String filePat

@Override
public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
int pageNumber, ColumnVectorInfo vectorInfo, ReusableDataBuffer reusableDataBuffer)
throws IOException, MemoryException {
throw new UnsupportedOperationException(
"This operation is not supported in this reader " + this.getClass().getName());
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
Expand Down Expand Up @@ -101,19 +102,26 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
return rawColumnChunk;
}

@Override public DimensionColumnPage decodeColumnPage(
DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
@Override
public DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ReusableDataBuffer reusableDataBuffer) throws IOException {
int blockIndex = dimensionRawColumnChunk.getColumnIndex();
byte[] dataPage = null;
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
int[] rlePage = null;
FileReader fileReader = dimensionRawColumnChunk.getFileReader();

ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
dataPage = compressor.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dimensionRawColumnChunk.getLength());

int uncompressedSize = compressor
.unCompressedLength(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dimensionRawColumnChunk.getLength());
if (null != reusableDataBuffer) {
dataPage = reusableDataBuffer.getDataBuffer(uncompressedSize);
} else {
dataPage = new byte[uncompressedSize];
}
compressor.rawUncompress(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dimensionRawColumnChunk.getLength(), dataPage);
// if row id block is present then read the row id chunk and uncompress it
DataChunk dataChunk = dimensionColumnChunk.get(blockIndex);
if (CarbonUtil.hasEncoding(dataChunk.getEncodingList(),
Expand Down Expand Up @@ -144,7 +152,9 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
rlePage = numberCompressor
.unCompress(key, 0, dataChunk.getRlePageLength());
// uncompress the data with rle indexes
dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
dataPage = UnBlockIndexer
.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex], uncompressedSize);
uncompressedSize = dataPage.length;
rlePage = null;
}
// fill chunk attributes
Expand All @@ -156,12 +166,12 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
columnDataChunk =
new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
numberOfRows, DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH,
null);
null, uncompressedSize);
} else {
// to store fixed length column chunk values
columnDataChunk =
new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
numberOfRows, eachColumnValueSize[blockIndex]);
numberOfRows, eachColumnValueSize[blockIndex], uncompressedSize);
}
return columnDataChunk;
}
Expand Down

0 comments on commit 83d8215

Please sign in to comment.