Skip to content

Commit

Permalink
[CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support
Browse files Browse the repository at this point in the history
What changes are proposed in this PR

Added code to support Local Dictionary Data Loading for primitive type
Added code to support Local Dictionary Data Loading for complex type.
How this PR is tested
Manual testing is done in 3 Node setup.
UT will be raised in different PR

This closes #2402
  • Loading branch information
kumarvishal09 authored and ravipesala committed Jun 27, 2018
1 parent 5804d75 commit e710339
Show file tree
Hide file tree
Showing 41 changed files with 2,001 additions and 354 deletions.
Expand Up @@ -89,4 +89,8 @@ public DictionaryByteArrayWrapper(byte[] data, XXHash32 xxHash32) {
result = 31 * result;
return result;
}

public byte[] getData() {
return data;
}
}
Expand Up @@ -37,7 +37,9 @@ public enum ColumnType {

COMPLEX_ARRAY,

COMPLEX_PRIMITIVE;
COMPLEX_PRIMITIVE,

PLAIN_LONG_VALUE;

public static ColumnType valueOf(int ordinal) {
if (ordinal == GLOBAL_DICTIONARY.ordinal()) {
Expand All @@ -56,6 +58,8 @@ public static ColumnType valueOf(int ordinal) {
return COMPLEX_ARRAY;
} else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) {
return COMPLEX_PRIMITIVE;
} else if (ordinal == PLAIN_LONG_VALUE.ordinal()) {
return PLAIN_LONG_VALUE;
} else {
throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal);
}
Expand Down
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.blocklet;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder;
import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.format.LocalDictionaryChunk;

/**
* Maintains the list of encoded page of a column in a blocklet
* and encoded dictionary values only if column is encoded using local
* dictionary
* Handle the fallback if all the pages in blocklet are not
* encoded with local dictionary
*/
public class BlockletEncodedColumnPage {

/**
* LOGGER
*/
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletEncodedColumnPage.class.getName());

/**
* list of encoded page of a column in a blocklet
*/
private List<EncodedColumnPage> encodedColumnPageList;

/**
* fallback executor service
*/
private ExecutorService fallbackExecutorService;

/**
* to check whether pages are local dictionary encoded or not
*/
private boolean isLocalDictEncoded;

/**
* page level dictionary only when column is encoded with local dictionary
*/
private PageLevelDictionary pageLevelDictionary;

/**
* fallback future task queue;
*/
private ArrayDeque<Future<FallbackEncodedColumnPage>> fallbackFutureQueue;

BlockletEncodedColumnPage(ExecutorService fallbackExecutorService) {
this.fallbackExecutorService = fallbackExecutorService;
}

/**
* Below method will be used to add column page of a column
*
* @param encodedColumnPage
* encoded column page
*/
void addEncodedColumnColumnPage(EncodedColumnPage encodedColumnPage) {
if (null == encodedColumnPageList) {
this.encodedColumnPageList = new ArrayList<>();
// if dimension page is local dictionary enabled and encoded with local dictionary
if (encodedColumnPage.isLocalDictGeneratedPage()) {
this.isLocalDictEncoded = true;
// get first page dictionary
this.pageLevelDictionary = encodedColumnPage.getPageDictionary();
}
encodedColumnPageList.add(encodedColumnPage);
return;
}
// if local dictionary is false or column is encoded with local dictionary then
// add a page
if (!isLocalDictEncoded || encodedColumnPage.isLocalDictGeneratedPage()) {
this.encodedColumnPageList.add(encodedColumnPage);
// merge page level dictionary values
if (null != this.pageLevelDictionary) {
pageLevelDictionary.mergerDictionaryValues(encodedColumnPage.getPageDictionary());
}
} else {
// if older pages were encoded with dictionary and new pages are without dictionary
isLocalDictEncoded = false;
pageLevelDictionary = null;
this.fallbackFutureQueue = new ArrayDeque<>();
LOGGER.info(
"Local dictionary Fallback is initiated for column: " + encodedColumnPageList.get(0)
.getActualPage().getColumnSpec().getFieldName());
// submit all the older pages encoded with dictionary for fallback
for (int pageIndex = 0; pageIndex < encodedColumnPageList.size(); pageIndex++) {
fallbackFutureQueue.add(fallbackExecutorService.submit(
new FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex)));
}
//add to page list
this.encodedColumnPageList.add(encodedColumnPage);
}
}

/**
* Return the list of encoded page list for a column in a blocklet
*
* @return list of encoded page list
*/
public List<EncodedColumnPage> getEncodedColumnPageList() {
// if fallback queue is null then for some pages fallback was initiated
if (null != this.fallbackFutureQueue) {
try {
// check if queue is not empty
while (!fallbackFutureQueue.isEmpty()) {
// get the head element of queue
FallbackEncodedColumnPage fallbackEncodedColumnPage = fallbackFutureQueue.poll().get();
// add the encoded column page to list
encodedColumnPageList.set(fallbackEncodedColumnPage.getPageIndex(),
fallbackEncodedColumnPage.getEncodedColumnPage());
fallbackFutureQueue.poll();
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Problem while encoding the blocklet data during fallback", e);
}
// setting to null as all the fallback encoded page has been added to list
fallbackFutureQueue = null;
}
// in case of dictionary encoded column page memory will be freed only after
// all the pages are added in a blocklet, as fallback can happen anytime so old pages memory
// cannot be freed, so after encoding is done we can free the page memory
if (null != pageLevelDictionary) {
// clear the memory footprint for local dictionary encoded pages
for (EncodedColumnPage columnPage : encodedColumnPageList) {
columnPage.freeMemory();
}
}
return encodedColumnPageList;
}

/**
* Below method will be used to get the encoded dictionary
* values for local dictionary generated columns
*
* @return encoded dictionary values if column is local dictionary generated
*/
public LocalDictionaryChunk getEncodedDictionary() {
if (null != pageLevelDictionary) {
try {
return pageLevelDictionary.getLocalDictionaryChunkForBlocklet();
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
}
return null;
}
}
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.blocklet;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.datastore.page.key.TablePageKey;

/**
* Holds the blocklet level data and metadata to be written in carbondata file
* For dimension pages it will check if all the pages are not encoded with dictionary
* then it will encode those pages for that column again
*/
public class EncodedBlocklet {

/**
* number of rows in a blocklet
*/
private int blockletSize;

/**
* list of page metadata
*/
private List<TablePageKey> pageMetadataList;

/**
* maintains encoded dimension data for each column
*/
private List<BlockletEncodedColumnPage> encodedDimensionColumnPages;

/**
* maintains encoded measure data for each column
*/
private List<BlockletEncodedColumnPage> encodedMeasureColumnPages;

/**
* fallback executor service, will used to re-encode column pages
*/
private ExecutorService executorService;

/**
* number of pages in a blocklet
*/
private int numberOfPages;

public EncodedBlocklet(ExecutorService executorService) {
this.executorService = executorService;
}

/**
* Below method will be used to add page metadata details
*
* @param encodedTablePage
* encoded table page
*/
private void addPageMetadata(EncodedTablePage encodedTablePage) {
// for first table page create new list
if (null == pageMetadataList) {
pageMetadataList = new ArrayList<>();
}
// update details
blockletSize += encodedTablePage.getPageSize();
pageMetadataList.add(encodedTablePage.getPageKey());
this.numberOfPages++;
}

/**
* Below method will be used to add measure column pages
*
* @param encodedTablePage
* encoded table page
*/
private void addEncodedMeasurePage(EncodedTablePage encodedTablePage) {
// for first page create new list
if (null == encodedMeasureColumnPages) {
encodedMeasureColumnPages = new ArrayList<>();
// adding measure pages
for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
BlockletEncodedColumnPage blockletEncodedColumnPage = new BlockletEncodedColumnPage(null);
blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
encodedMeasureColumnPages.add(blockletEncodedColumnPage);
}
} else {
for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
encodedMeasureColumnPages.get(i).addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
}
}
}

/**
* Below method will be used to add dimension column pages
*
* @param encodedTablePage
* encoded table page
*/
private void addEncodedDimensionPage(EncodedTablePage encodedTablePage) {
// for first page create new list
if (null == encodedDimensionColumnPages) {
encodedDimensionColumnPages = new ArrayList<>();
// adding measure pages
for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
BlockletEncodedColumnPage blockletEncodedColumnPage =
new BlockletEncodedColumnPage(executorService);
blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
encodedDimensionColumnPages.add(blockletEncodedColumnPage);
}
} else {
for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
encodedDimensionColumnPages.get(i)
.addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
}
}
}

/**
* Use to add table pages
*
* @param encodedTablePage
* encoded table page
*/
public void addEncodedTablePage(EncodedTablePage encodedTablePage) {
addPageMetadata(encodedTablePage);
addEncodedDimensionPage(encodedTablePage);
addEncodedMeasurePage(encodedTablePage);
}

public int getBlockletSize() {
return blockletSize;
}

public List<TablePageKey> getPageMetadataList() {
return pageMetadataList;
}

public List<BlockletEncodedColumnPage> getEncodedDimensionColumnPages() {
return encodedDimensionColumnPages;
}

public List<BlockletEncodedColumnPage> getEncodedMeasureColumnPages() {
return encodedMeasureColumnPages;
}

public int getNumberOfDimension() {
return encodedDimensionColumnPages.size();
}

public int getNumberOfMeasure() {
return encodedMeasureColumnPages.size();
}

public int getNumberOfPages() {
return this.numberOfPages;
}

public void clear() {
this.numberOfPages = 0;
this.encodedDimensionColumnPages = null;
this.blockletSize = 0;
this.encodedMeasureColumnPages = null;
this.pageMetadataList = null;
}
}
Expand Up @@ -242,7 +242,8 @@ protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnP
}

private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset) {
ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
MemoryException {
byte[] dataPage;
int[] rlePage;
int[] invertedIndexes = new int[0];
Expand Down

0 comments on commit e710339

Please sign in to comment.