Skip to content

Commit

Permalink
Merge 713025c into bd752e9
Browse files Browse the repository at this point in the history
  • Loading branch information
BJangir committed Dec 21, 2018
2 parents bd752e9 + 713025c commit 1c1dee4
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 57 deletions.
Expand Up @@ -19,8 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.*;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
Expand Down Expand Up @@ -68,6 +67,10 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
private AbstractDetailQueryResultIterator iterator;

private QueryModel queryModel;
//This holds mapping of fetch index with respect to project col index.
// it is used when same col is used in projection many times.So need to fetch only that col.
private List<Integer> projectionMapping = new ArrayList<>();


public CarbonVectorizedRecordReader(QueryModel queryModel) {
this.queryModel = queryModel;
Expand Down Expand Up @@ -155,38 +158,57 @@ private void initBatch() {
}
}
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];

Map<String, Integer> colmap = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
vectors[i] = new CarbonColumnVectorImpl(
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
fields[i].getDataType());
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
fields[i].getDataType());
if (colmap.containsKey(fields[i].getFieldName())) {
int reusedIndex = colmap.get(fields[i].getFieldName());
projectionMapping.add(reusedIndex);
} else {
colmap.put(fields[i].getFieldName(), i);
projectionMapping.add(i);
}
}
carbonColumnarBatch = new CarbonColumnarBatch(vectors,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
new boolean[] {});
}
}

// if same col is given in projection many time then below logic is used to scan only once
// Ex. project cols=C1,C2,C3,C2 , projectionMapping holds[0,1,2,1]
// Row will be formed based on projectionMapping.
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
rowCount += 1;
Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
Object data = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
|| carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
if (data == null) {
row[i] = null;
} else {
row[i] = ByteUtil.toString((byte[]) data, 0, (((byte[]) data).length));
}
} else if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.BOOLEAN) {
if (data == null) {
row[i] = null;
Object[] row = new Object[projectionMapping.size()];
for (int i = 0; i < projectionMapping.size(); i ++) {
// if projectionMapping.get(i) <i it means row is fetched already
if (projectionMapping.get(i) < i) {
row[i] = row[projectionMapping.get(i)];
} else {
Object data = carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
.getData(batchIdx - 1);
if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
|| carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
if (data == null) {
row[i] = null;
} else {
row[i] = ByteUtil.toString((byte[]) data, 0, (((byte[]) data).length));
}
} else if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.BOOLEAN) {
if (data == null) {
row[i] = null;
} else {
row[i] = ByteUtil.toBoolean((byte) data);
}
} else {
row[i] = ByteUtil.toBoolean((byte) data);
row[i] = carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
.getData(batchIdx - 1);
}
} else {
row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
}
}
return row;
Expand Down
Expand Up @@ -2492,6 +2492,26 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
FileUtils.deleteDirectory(new File(writerPath))
}

test("test Local Dictionary with Default") {
FileUtils.deleteDirectory(new File(writerPath))
val builder = CarbonWriter.builder
.sortBy(Array[String]("name")).withBlockSize(12)
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath).writtenBy("TestNonTransactionalCarbonTable")
generateCarbonData(builder)
assert(FileFactory.getCarbonFile(writerPath).exists())
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
sql("DROP TABLE IF EXISTS sdkTable")
sql(
s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
val descLoc = sql("describe formatted sdkTable").collect
descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
case Some(row) => assert(row.get(1).toString.contains("true"))
case None => assert(false)
}
FileUtils.deleteDirectory(new File(writerPath))
}

def generateCarbonData(builder :CarbonWriterBuilder): Unit ={
val fields = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;

Expand All @@ -47,6 +48,8 @@ public class CarbonReader<T> {

private boolean initialise;

private boolean isFirst = true;

/**
* save batch rows data
*/
Expand All @@ -69,6 +72,10 @@ public class CarbonReader<T> {
* Return true if has next row
*/
public boolean hasNext() throws IOException, InterruptedException {
if (isFirst) {
ThreadLocalTaskInfo.setCarbonTaskInfo(ThreadLocalTaskInfo.getCarbonTaskInfo());
isFirst = false;
}
validateReader();
if (currentReader.nextKeyValue()) {
return true;
Expand Down
Expand Up @@ -65,7 +65,8 @@ public class CarbonWriterBuilder {
private Map<String, String> options;
private String taskNo;
private int localDictionaryThreshold;
private boolean isLocalDictionaryEnabled;
private boolean isLocalDictionaryEnabled = Boolean.parseBoolean(
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT);
private short numOfThreads;
private Configuration hadoopConf;
private String writtenByApp;
Expand Down
Expand Up @@ -93,10 +93,10 @@ public void testSummaryOutputIndividual() {
String output = new String(out.toByteArray());

String expectedOutput = buildLines(
"Input Folder: ./CarbonCliTest",
"## Summary",
"total: 6 blocks, 2 shards, 14 blocklets, 314 pages, 10,000,000 rows, 32.26MB",
"avg: 5.38MB/block, 2.30MB/blocklet, 1,666,666 rows/block, 714,285 rows/blocklet");
"Input Folder: ./CarbonCliTest" ,
"## Summary",
"total: 6 blocks, 2 shards, 12 blocklets, 314 pages, 10,000,000 rows, 28.68MB",
"avg: 4.78MB/block, 2.39MB/blocklet, 1,666,666 rows/block, 833,333 rows/blocklet");
Assert.assertTrue(output.contains(expectedOutput));

String[] args2 = {"-cmd", "summary", "-p", path, "-s"};
Expand Down Expand Up @@ -129,14 +129,13 @@ public void testSummaryOutputIndividual() {
output = new String(out.toByteArray());

expectedOutput = buildLines(
"BLK BLKLT NumPages NumRows Size ",
"0 0 25 800,000 2.58MB ",
"0 1 25 800,000 2.58MB ",
"1 0 25 800,000 2.58MB ",
"1 1 25 800,000 2.58MB ",
"2 0 25 800,000 2.58MB ",
"2 1 25 800,000 2.58MB ",
"2 2 7 200,000 660.70KB ");
"BLK BLKLT NumPages NumRows Size " ,
"0 0 28 896,000 2.57MB " ,
"0 1 28 896,000 2.57MB " ,
"1 0 28 896,000 2.57MB " ,
"1 1 28 896,000 2.57MB " ,
"2 0 28 896,000 2.57MB " ,
"2 1 17 520,000 1.49MB ");
Assert.assertTrue(output.contains(expectedOutput));

String[] args5 = {"-cmd", "summary", "-p", path, "-c", "name"};
Expand All @@ -146,14 +145,13 @@ public void testSummaryOutputIndividual() {
output = new String(out.toByteArray());

expectedOutput = buildLines(
"BLK BLKLT Meta Size Data Size LocalDict DictEntries DictSize AvgPageSize Min% Max% Min Max ",
"0 0 1.74KB 295.67KB false 0 0.0B 11.76KB NA NA robot0 robot1 ",
"0 1 1.74KB 295.67KB false 0 0.0B 11.76KB NA NA robot1 robot3 ",
"1 0 1.74KB 295.67KB false 0 0.0B 11.76KB NA NA robot3 robot4 ",
"1 1 1.74KB 295.67KB false 0 0.0B 11.76KB NA NA robot4 robot6 ",
"2 0 1.74KB 295.67KB false 0 0.0B 11.76KB NA NA robot6 robot7 ",
"2 1 1.74KB 295.67KB false 0 0.0B 11.76KB NA NA robot8 robot9 ",
"2 2 498.0B 73.97KB false 0 0.0B 10.50KB NA NA robot9 robot9 ");
"BLK BLKLT Meta Size Data Size LocalDict DictEntries DictSize AvgPageSize Min% Max% Min Max " ,
"0 0 1.90KB 2.15KB true 2 18.0B 9.0B NA NA robot0 robot1 " ,
"0 1 1.90KB 2.16KB true 3 22.0B 9.0B NA NA robot1 robot3 " ,
"1 0 1.90KB 2.16KB true 3 22.0B 9.0B NA NA robot3 robot5 " ,
"1 1 1.90KB 2.16KB true 3 22.0B 9.0B NA NA robot5 robot7 " ,
"2 0 1.90KB 2.14KB true 2 18.0B 9.0B NA NA robot7 robot8 " ,
"2 1 1.18KB 1.33KB true 2 18.0B 9.0B NA NA robot8 robot9 ");
Assert.assertTrue(output.contains(expectedOutput));
}

Expand All @@ -166,10 +164,10 @@ public void testSummaryOutputAll() {
String output = new String(out.toByteArray());

String expectedOutput = buildLines(
"Input Folder: ./CarbonCliTest",
"Input Folder: ./CarbonCliTest" ,
"## Summary",
"total: 6 blocks, 2 shards, 14 blocklets, 314 pages, 10,000,000 rows, 32.26MB",
"avg: 5.38MB/block, 2.30MB/blocklet, 1,666,666 rows/block, 714,285 rows/blocklet");
"total: 6 blocks, 2 shards, 12 blocklets, 314 pages, 10,000,000 rows, 28.68MB",
"avg: 4.78MB/block, 2.39MB/blocklet, 1,666,666 rows/block, 833,333 rows/blocklet");

Assert.assertTrue(output.contains(expectedOutput));

Expand All @@ -186,21 +184,20 @@ public void testSummaryOutputAll() {

expectedOutput = buildLines(
"BLK BLKLT NumPages NumRows Size ",
"0 0 25 800,000 2.58MB ",
"0 1 25 800,000 2.58MB ",
"1 0 25 800,000 2.58MB ",
"1 1 25 800,000 2.58MB ");
"0 0 28 896,000 2.57MB ",
"0 1 28 896,000 2.57MB ",
"1 0 28 896,000 2.57MB ",
"1 1 28 896,000 2.57MB ");
Assert.assertTrue(output.contains(expectedOutput));

expectedOutput = buildLines(
"BLK BLKLT Meta Size Data Size LocalDict DictEntries DictSize AvgPageSize Min% Max% Min Max ",
"0 0 3.00KB 4.87MB false 0 0.0B 93.76KB 0.0 100.0 0 2999990 ",
"0 1 3.00KB 2.29MB false 0 0.0B 93.76KB 0.0 100.0 1 2999992 ",
"1 0 3.00KB 4.87MB false 0 0.0B 93.76KB 0.0 100.0 3 2999993 ",
"1 1 3.00KB 2.29MB false 0 0.0B 93.76KB 0.0 100.0 4 2999995 ",
"2 0 3.00KB 5.52MB false 0 0.0B 93.76KB 0.0 100.0 6 2999997 ",
"2 1 3.00KB 2.94MB false 0 0.0B 93.76KB 0.0 100.0 8 2999998 ",
"2 2 858.0B 586.84KB false 0 0.0B 83.71KB 0.0 100.0 9 2999999 ");
"BLK BLKLT Meta Size Data Size LocalDict DictEntries DictSize AvgPageSize Min% Max% Min Max " ,
"0 0 3.36KB 5.14MB false 0 0.0B 93.76KB 0.0 100.0 0 2999990 " ,
"0 1 3.36KB 2.57MB false 0 0.0B 93.76KB 0.0 100.0 1 2999992 " ,
"1 0 3.36KB 5.14MB false 0 0.0B 93.76KB 0.0 100.0 3 2999994 " ,
"1 1 3.36KB 2.57MB false 0 0.0B 93.76KB 0.0 100.0 5 2999996 " ,
"2 0 3.36KB 4.06MB false 0 0.0B 93.76KB 0.0 100.0 7 2999998 " ,
"2 1 2.04KB 1.49MB false 0 0.0B 89.62KB 0.0 100.0 9 2999999 ");
Assert.assertTrue(output.contains(expectedOutput));

expectedOutput = buildLines(
Expand All @@ -220,7 +217,7 @@ public void testSummaryPageMeta() {
System.out.println(output);
String expectedOutput = buildLines(
"Blocklet 0:",
"Page 0 (offset 0, length 12039): DataChunk2(chunk_meta:ChunkCompressionMeta(compression_codec:DEPRECATED, total_uncompressed_size:256000, total_compressed_size:12039, compressor_name:snappy), rowMajor:false, data_page_length:12039, presence:PresenceMeta(represents_presence:false, present_bit_stream:00), sort_state:SORT_NATIVE, encoders:[], encoder_meta:[], min_max:BlockletMinMaxIndex(min_values:[72 6F 62 6F 74 30], max_values:[72 6F 62 6F 74 30], min_max_presence:[true]), numberOfRowsInpage:32000)");
"Page 0 (offset 0, length 9): DataChunk2(chunk_meta:ChunkCompressionMeta(compression_codec:DEPRECATED, total_uncompressed_size:96000, total_compressed_size:9, compressor_name:snappy), rowMajor:false, data_page_length:5, rle_page_length:4, presence:PresenceMeta(represents_presence:false, present_bit_stream:00), sort_state:SORT_NATIVE, encoders:[RLE], encoder_meta:[], min_max:BlockletMinMaxIndex(min_values:[72 6F 62 6F 74 30], max_values:[72 6F 62 6F 74 30], min_max_presence:[true]), numberOfRowsInpage:32000)");
Assert.assertTrue(output.contains(expectedOutput));
}

Expand Down

0 comments on commit 1c1dee4

Please sign in to comment.