Skip to content

Commit

Permalink
Merge 8f3cd07 into 94d2089
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Sep 5, 2018
2 parents 94d2089 + 8f3cd07 commit cb9f4c6
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 90 deletions.
Expand Up @@ -937,6 +937,16 @@ public final class CarbonCommonConstants {
*/
public static final String LOCAL_DICTIONARY_SYSTEM_ENABLE = "carbon.local.dictionary.enable";

/**
* System property to enable or disable decoder based local dictionary fallback
*/
public static final String LOCAL_DICTIONARY_DECODER_BASED_FALLBACK =
"carbon.local.dictionary.decoder.fallback";

/**
* System property to enable or disable decoder based local dictionary fallback default value
*/
public static final String LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT = "true";
/**
* Threshold value for local dictionary
*/
Expand Down
Expand Up @@ -26,10 +26,12 @@

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder;
import org.apache.carbondata.core.datastore.page.ActualDataBasedFallbackEncoder;
import org.apache.carbondata.core.datastore.page.DecoderBasedFallbackEncoder;
import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.format.LocalDictionaryChunk;

Expand Down Expand Up @@ -75,9 +77,22 @@ public class BlockletEncodedColumnPage {

private String columnName;

BlockletEncodedColumnPage(ExecutorService fallbackExecutorService) {
/**
* is decoder based fallback enabled
*/
private boolean isDecoderBasedFallBackEnabled;

/**
* Local dictionary generator for column
*/
private LocalDictionaryGenerator localDictionaryGenerator;

BlockletEncodedColumnPage(ExecutorService fallbackExecutorService,
boolean isDecoderBasedFallBackEnabled, LocalDictionaryGenerator localDictionaryGenerator) {
this.fallbackExecutorService = fallbackExecutorService;
this.fallbackFutureQueue = new ArrayDeque<>();
this.isDecoderBasedFallBackEnabled = isDecoderBasedFallBackEnabled;
this.localDictionaryGenerator = localDictionaryGenerator;
}

/**
Expand All @@ -86,7 +101,7 @@ public class BlockletEncodedColumnPage {
* @param encodedColumnPage
* encoded column page
*/
void addEncodedColumnColumnPage(EncodedColumnPage encodedColumnPage) {
void addEncodedColumnPage(EncodedColumnPage encodedColumnPage) {
if (null == encodedColumnPageList) {
this.encodedColumnPageList = new ArrayList<>();
// if dimension page is local dictionary enabled and encoded with local dictionary
Expand All @@ -105,8 +120,7 @@ void addEncodedColumnColumnPage(EncodedColumnPage encodedColumnPage) {
LOGGER.info(
"Local dictionary Fallback is initiated for column: " + this.columnName + " for page:"
+ encodedColumnPageList.size());
fallbackFutureQueue.add(fallbackExecutorService
.submit(new FallbackColumnPageEncoder(encodedColumnPage, encodedColumnPageList.size())));
initiateFallBack(encodedColumnPage, encodedColumnPageList.size());
// fill null so once page is decoded again fill the re-encoded page again
this.encodedColumnPageList.add(null);
}
Expand All @@ -128,8 +142,7 @@ else if (!isLocalDictEncoded || encodedColumnPage.isLocalDictGeneratedPage()) {
// submit all the older pages encoded with dictionary for fallback
for (int pageIndex = 0; pageIndex < encodedColumnPageList.size(); pageIndex++) {
if (encodedColumnPageList.get(pageIndex).getActualPage().isLocalDictGeneratedPage()) {
fallbackFutureQueue.add(fallbackExecutorService.submit(
new FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex)));
initiateFallBack(encodedColumnPageList.get(pageIndex), pageIndex);
}
}
//add to page list
Expand Down Expand Up @@ -188,4 +201,19 @@ public LocalDictionaryChunk getEncodedDictionary() {
}
return null;
}

/**
* This method initiates the fallback for local dictionary encoded column page
* @param encodedColumnPage
* @param pageIndex
*/
private void initiateFallBack(EncodedColumnPage encodedColumnPage, int pageIndex) {
if (isDecoderBasedFallBackEnabled) {
fallbackFutureQueue.add(fallbackExecutorService.submit(
new DecoderBasedFallbackEncoder(encodedColumnPage, pageIndex, localDictionaryGenerator)));
} else {
fallbackFutureQueue.add(fallbackExecutorService.submit(
new ActualDataBasedFallbackEncoder(encodedColumnPage, encodedColumnPageList.size())));
}
}
}
Expand Up @@ -18,10 +18,12 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.datastore.page.key.TablePageKey;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;

/**
* Holds the blocklet level data and metadata to be written in carbondata file
Expand Down Expand Up @@ -60,8 +62,21 @@ public class EncodedBlocklet {
*/
private int numberOfPages;

public EncodedBlocklet(ExecutorService executorService) {
/**
* is decoder based fallback is enabled or not
*/
private boolean isDecoderBasedFallBackEnabled;

/**
* local dictionary generator map of all local dictionary columns
*/
private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;

public EncodedBlocklet(ExecutorService executorService, boolean isDecoderBasedFallBackEnabled,
Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap) {
this.executorService = executorService;
this.isDecoderBasedFallBackEnabled = isDecoderBasedFallBackEnabled;
this.localDictionaryGeneratorMap = localDictionaryGeneratorMap;
}

/**
Expand Down Expand Up @@ -93,13 +108,14 @@ private void addEncodedMeasurePage(EncodedTablePage encodedTablePage) {
encodedMeasureColumnPages = new ArrayList<>();
// adding measure pages
for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
BlockletEncodedColumnPage blockletEncodedColumnPage = new BlockletEncodedColumnPage(null);
blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
BlockletEncodedColumnPage blockletEncodedColumnPage =
new BlockletEncodedColumnPage(null, false, null);
blockletEncodedColumnPage.addEncodedColumnPage(encodedTablePage.getMeasure(i));
encodedMeasureColumnPages.add(blockletEncodedColumnPage);
}
} else {
for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
encodedMeasureColumnPages.get(i).addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
encodedMeasureColumnPages.get(i).addEncodedColumnPage(encodedTablePage.getMeasure(i));
}
}
}
Expand All @@ -117,14 +133,16 @@ private void addEncodedDimensionPage(EncodedTablePage encodedTablePage) {
// adding measure pages
for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
BlockletEncodedColumnPage blockletEncodedColumnPage =
new BlockletEncodedColumnPage(executorService);
blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
new BlockletEncodedColumnPage(executorService, isDecoderBasedFallBackEnabled,
localDictionaryGeneratorMap.get(
encodedTablePage.getDimension(i).getActualPage().getColumnSpec()
.getFieldName()));
blockletEncodedColumnPage.addEncodedColumnPage(encodedTablePage.getDimension(i));
encodedDimensionColumnPages.add(blockletEncodedColumnPage);
}
} else {
for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
encodedDimensionColumnPages.get(i)
.addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
encodedDimensionColumnPages.get(i).addEncodedColumnPage(encodedTablePage.getDimension(i));
}
}
}
Expand Down
Expand Up @@ -80,19 +80,4 @@ public AbstractChunkReader(final int[] eachColumnValueSize, final String filePat
this.numberOfRows = numberOfRows;
}

/**
* Below method will be used to create the inverted index reverse
* this will be used to point to actual data in the chunk
*
* @param invertedIndex inverted index
* @return reverse inverted index
*/
protected int[] getInvertedReverseIndex(int[] invertedIndex) {
int[] columnIndexTemp = new int[invertedIndex.length];

for (int i = 0; i < invertedIndex.length; i++) {
columnIndexTemp[invertedIndex[i]] = i;
}
return columnIndexTemp;
}
}
Expand Up @@ -22,7 +22,6 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.format.Encoding;

/**
* Abstract class for V2, V3 format dimension column reader
Expand Down Expand Up @@ -112,15 +111,4 @@ public AbstractChunkReaderV2V3Format(final BlockletInfo blockletInfo,
protected abstract DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fileReader,
int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;

/**
* Below method will be used to check whether particular encoding is present
* in the dimension or not
*
* @param encoding encoding to search
* @return if encoding is present in dimension
*/
protected boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
return encodings.contains(encoding);
}

}
Expand Up @@ -125,7 +125,7 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
.getUnCompressColumnIndex(dataChunk.getRowIdPageLength(),
columnIndexData, numberComressor, 0);
// get the reverse index
invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
Expand Down
Expand Up @@ -147,7 +147,7 @@ public DimensionColumnPage decodeColumnPage(
.unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
if (CarbonUtil.hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
byte[] dataInv = new byte[dimensionColumnChunk.rowid_page_length];
rawData.position(copySourcePoint);
rawData.get(dataInv);
Expand All @@ -156,11 +156,11 @@ public DimensionColumnPage decodeColumnPage(
numberComressor, 0);
copySourcePoint += dimensionColumnChunk.rowid_page_length;
// get the reverse index
invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
if (CarbonUtil.hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
byte[] dataRle = new byte[dimensionColumnChunk.rle_page_length];
rawData.position(copySourcePoint);
rawData.get(dataRle);
Expand All @@ -173,7 +173,7 @@ public DimensionColumnPage decodeColumnPage(

// if no dictionary column then first create a no dictionary column chunk
// and set to data chunk instance
if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
if (!CarbonUtil.hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
columnDataChunk =
new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
numberOfRows, DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH,
Expand Down
Expand Up @@ -153,11 +153,11 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
.get(dimensionRawColumnChunk.getColumnIndex()) + dataChunk3.getPage_offset()
.get(pageNumber);
int length = pageMetadata.data_page_length;
if (hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
length += pageMetadata.rowid_page_length;
}

if (hasEncoding(pageMetadata.encoders, Encoding.RLE)) {
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.RLE)) {
length += pageMetadata.rle_page_length;
}
// get the data buffer
Expand Down
Expand Up @@ -276,16 +276,16 @@ private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawCol
dataPage = COMPRESSOR.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
offset += pageMetadata.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
offset += pageMetadata.rowid_page_length;
// get the reverse index
invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
if (hasEncoding(pageMetadata.encoders, Encoding.RLE)) {
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.RLE)) {
rlePage =
CarbonUtil.getIntArray(pageData, offset, pageMetadata.rle_page_length);
// uncompress the data with rle indexes
Expand All @@ -298,11 +298,11 @@ private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawCol
DimensionColumnPage columnDataChunk = null;
// if no dictionary column then first create a no dictionary column chunk
// and set to data chunk instance
if (!hasEncoding(pageMetadata.encoders, Encoding.DICTIONARY)) {
if (!CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.DICTIONARY)) {
DimensionChunkStoreFactory.DimensionStoreType dimStoreType =
null != rawColumnPage.getLocalDictionary() ?
DimensionChunkStoreFactory.DimensionStoreType.LOCAL_DICT :
(hasEncoding(pageMetadata.encoders, Encoding.DIRECT_COMPRESS_VARCHAR) ?
(CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.DIRECT_COMPRESS_VARCHAR) ?
DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_INT_LENGTH :
DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH);
columnDataChunk =
Expand Down
Expand Up @@ -19,17 +19,17 @@
import java.util.concurrent.Callable;

import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.util.CarbonUtil;

/**
* Below class will be used to encode column pages for which local dictionary was generated
* but all the pages in blocklet was not encoded with local dictionary.
* This is required as all the pages of a column in blocklet either it will be local dictionary
* encoded or without local dictionary encoded.
*/
public class FallbackColumnPageEncoder implements Callable<FallbackEncodedColumnPage> {
public class ActualDataBasedFallbackEncoder
implements Callable<FallbackEncodedColumnPage> {

/**
* actual local dictionary generated column page
Expand All @@ -44,39 +44,20 @@ public class FallbackColumnPageEncoder implements Callable<FallbackEncodedColumn
*/
private int pageIndex;

public FallbackColumnPageEncoder(EncodedColumnPage encodedColumnPage, int pageIndex) {
public ActualDataBasedFallbackEncoder(EncodedColumnPage encodedColumnPage,
int pageIndex) {
this.encodedColumnPage = encodedColumnPage;
this.pageIndex = pageIndex;
}

@Override public FallbackEncodedColumnPage call() throws Exception {
// disable encoding using local dictionary
encodedColumnPage.getActualPage().disableLocalDictEncoding();
// new encoded column page
EncodedColumnPage newEncodedColumnPage;

// get column spec for existing column page
TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
switch (columnSpec.getColumnType()) {
case COMPLEX_ARRAY:
case COMPLEX_STRUCT:
case COMPLEX:
throw new RuntimeException("Unsupported DataType. Only COMPLEX_PRIMITIVE should come");

case COMPLEX_PRIMITIVE:
// for complex type column
newEncodedColumnPage = ColumnPageEncoder.encodedColumn(
encodedColumnPage.getActualPage());
break;
default:
// for primitive column
ColumnPageEncoder columnPageEncoder = DefaultEncodingFactory.getInstance()
.createEncoder(encodedColumnPage.getActualPage().getColumnSpec(),
encodedColumnPage.getActualPage());
newEncodedColumnPage = columnPageEncoder.encode(encodedColumnPage.getActualPage());
}
FallbackEncodedColumnPage fallbackEncodedColumnPage =
new FallbackEncodedColumnPage(newEncodedColumnPage, pageIndex);
FallbackEncodedColumnPage fallbackEncodedColumnPage = CarbonUtil
.getFallBackEncodedColumnPage(encodedColumnPage.getActualPage(), pageIndex, columnSpec);
// here freeing the memory of raw column page as fallback is done and column page will not
// be used.
// This is required to free the memory once it is of no use
Expand Down
Expand Up @@ -165,15 +165,18 @@ public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataTyp
public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
int pageSize, LocalDictionaryGenerator localDictionaryGenerator,
boolean isComplexTypePrimitive) throws MemoryException {
boolean isDecoderBasedFallBackEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK,
CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT));
if (unsafe) {
return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize),
new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize,
CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE),
localDictionaryGenerator, isComplexTypePrimitive);
CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), localDictionaryGenerator,
isComplexTypePrimitive, isDecoderBasedFallBackEnabled);
} else {
return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize),
new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize),
localDictionaryGenerator, isComplexTypePrimitive);
localDictionaryGenerator, isComplexTypePrimitive, isDecoderBasedFallBackEnabled);
}
}

Expand Down

0 comments on commit cb9f4c6

Please sign in to comment.