-
Notifications
You must be signed in to change notification settings - Fork 703
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added SDV test cases for Local Dictionary Support
- Loading branch information
1 parent
8e54f1e
commit 95b96ad
Showing
2 changed files
with
314 additions
and
2 deletions.
There are no files selected for viewing
309 changes: 309 additions & 0 deletions
309
...la/org/apache/carbondata/cluster/sdv/generated/LoadTableWithLocalDictionaryTestCase.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,309 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.carbondata.spark.testsuite.localdictionary | ||
|
||
import scala.collection.JavaConverters._ | ||
import java.io.{File, PrintWriter} | ||
import java.util | ||
import java.util.Collections | ||
|
||
import org.apache.spark.sql.test.util.QueryTest | ||
import org.scalatest.{BeforeAndAfterAll, Ignore} | ||
|
||
import org.apache.carbondata.core.cache.dictionary.DictionaryByteArrayWrapper | ||
import org.apache.carbondata.core.constants.CarbonCommonConstants | ||
import org.apache.carbondata.core.datastore.block.TableBlockInfo | ||
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk | ||
import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory | ||
import org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3 | ||
import org.apache.carbondata.core.datastore.compression.CompressorFactory | ||
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} | ||
import org.apache.carbondata.core.datastore.impl.FileFactory | ||
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory | ||
import org.apache.carbondata.core.metadata.ColumnarFormatVersion | ||
import org.apache.carbondata.core.util.{CarbonProperties, DataFileFooterConverterV3} | ||
|
||
class LoadTableWithLocalDictionaryTestCase extends QueryTest with BeforeAndAfterAll { | ||
|
||
val file1 = resourcesPath + "/local_dictionary_source1.csv" | ||
|
||
val file2 = resourcesPath + "/local_dictionary_complex_data.csv" | ||
|
||
val storePath = warehouse + "/local2/Fact/Part0/Segment_0" | ||
|
||
override protected def beforeAll(): Unit = { | ||
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "10000") | ||
createFile(file1) | ||
createComplexData(file2) | ||
sql("drop table if exists local2") | ||
} | ||
|
||
test("test LocalDictionary Load For FallBackScenario"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_threshold'='2001','local_dictionary_include'='name')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(!checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test successful local dictionary generation"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_enable'='true','local_dictionary_threshold'='9001','local_dictionary_include'='name')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test successful local dictionary generation for default configs") { | ||
sql("drop table if exists local2") | ||
sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test local dictionary generation for local dictioanry include") { | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" + | ||
"('dictionary_include'='name')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(!checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test local dictionary generation for local dictioanry exclude"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_enable'='true','dictionary_exclude'='name')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test local dictionary generation when it is disabled"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_enable'='false','local_dictionary_include'='name')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(!checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test local dictionary generation for invalid threshold configurations"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_enable'='true','local_dictionary_include'='name','local_dictionary_threshold'='300000')") | ||
sql("load data inpath '" + file1 + "' into table local2 OPTIONS('header'='false')") | ||
assert(checkForLocalDictionary(getDimRawChunk(0))) | ||
} | ||
|
||
test("test local dictionary generation for include and exclude"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name string, age string) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_enable'='true','local_dictionary_include'='name', 'local_dictionary_exclude'='age')") | ||
sql("insert into table local2 values('vishal', '30')") | ||
assert(checkForLocalDictionary(getDimRawChunk(0))) | ||
assert(!checkForLocalDictionary(getDimRawChunk(1))) | ||
} | ||
|
||
test("test local dictionary generation for complex type"){ | ||
sql("drop table if exists local2") | ||
sql( | ||
"CREATE TABLE local2(name struct<i:string,s:string>) STORED BY 'carbondata' tblproperties" + | ||
"('local_dictionary_enable'='true','local_dictionary_include'='name')") | ||
sql("load data inpath '" + file2 + | ||
"' into table local2 OPTIONS('header'='false','COMPLEX_DELIMITER_LEVEL_1'='$', " + | ||
"'COMPLEX_DELIMITER_LEVEL_2'=':')") | ||
assert(!checkForLocalDictionary(getDimRawChunk(0))) | ||
assert(checkForLocalDictionary(getDimRawChunk(1))) | ||
assert(checkForLocalDictionary(getDimRawChunk(2))) | ||
} | ||
|
||
test("test to validate local dictionary values"){ | ||
sql("drop table if exists local2") | ||
sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')") | ||
sql("load data inpath '" + resourcesPath + "/localdictionary.csv" + "' into table local2") | ||
val dimRawChunk = getDimRawChunk(0) | ||
val dictionaryData = Array("vishal", "kumar", "akash", "praveen", "brijoo") | ||
try | ||
assert(validateDictionary(dimRawChunk.get(0), dictionaryData)) | ||
catch { | ||
case e: Exception => | ||
assert(false) | ||
} | ||
} | ||
|
||
override protected def afterAll(): Unit = { | ||
sql("drop table if exists local2") | ||
deleteFile(file1) | ||
deleteFile(file2) | ||
CarbonProperties.getInstance | ||
.addProperty(CarbonCommonConstants.BLOCKLET_SIZE, | ||
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL) | ||
} | ||
|
||
/** | ||
* create the csv data file | ||
* | ||
* @param fileName | ||
*/ | ||
private def createFile(fileName: String, line: Int = 9000, start: Int = 0): Unit = { | ||
val writer = new PrintWriter(new File(fileName)) | ||
val data: util.ArrayList[String] = new util.ArrayList[String]() | ||
for (i <- start until line) { | ||
data.add("n" + i) | ||
} | ||
Collections.sort(data) | ||
data.asScala.foreach { eachdata => | ||
writer.println(eachdata) | ||
} | ||
writer.close() | ||
} | ||
|
||
/** | ||
* create the csv for complex data | ||
* | ||
* @param fileName | ||
*/ | ||
private def createComplexData(fileName: String, line: Int = 9000, start: Int = 0): Unit = { | ||
val writer = new PrintWriter(new File(fileName)) | ||
val data: util.ArrayList[String] = new util.ArrayList[String]() | ||
for (i <- start until line) { | ||
data.add("n" + i + "$" + (i + 1)) | ||
} | ||
Collections.sort(data) | ||
data.asScala.foreach { eachdata => | ||
writer.println(eachdata) | ||
} | ||
writer.close() | ||
} | ||
|
||
/** | ||
* delete csv file after test execution | ||
* | ||
* @param fileName | ||
*/ | ||
private def deleteFile(fileName: String): Unit = { | ||
val file: File = new File(fileName) | ||
if (file.exists) file.delete | ||
} | ||
|
||
private def checkForLocalDictionary(dimensionRawColumnChunks: util | ||
.List[DimensionRawColumnChunk]): Boolean = { | ||
var isLocalDictionaryGenerated = false | ||
import scala.collection.JavaConversions._ | ||
for (dimensionRawColumnChunk <- dimensionRawColumnChunks) { | ||
if (dimensionRawColumnChunk.getDataChunkV3 | ||
.isSetLocal_dictionary) { | ||
isLocalDictionaryGenerated = true | ||
} | ||
} | ||
isLocalDictionaryGenerated | ||
} | ||
|
||
/** | ||
* this method returns true if local dictionary is created for all the blocklets or not | ||
* | ||
* @return | ||
*/ | ||
private def getDimRawChunk(blockindex: Int): util.ArrayList[DimensionRawColumnChunk] = { | ||
val dataFiles = FileFactory.getCarbonFile(storePath) | ||
.listFiles(new CarbonFileFilter() { | ||
override def accept(file: CarbonFile): Boolean = { | ||
if (file.getName | ||
.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { | ||
true | ||
} else { | ||
false | ||
} | ||
} | ||
}) | ||
val dimensionRawColumnChunks = read(dataFiles(0).getAbsolutePath, | ||
blockindex) | ||
dimensionRawColumnChunks | ||
} | ||
|
||
private def read(filePath: String, blockIndex: Int) = { | ||
val carbonDataFiles = new File(filePath) | ||
val dimensionRawColumnChunks = new | ||
util.ArrayList[DimensionRawColumnChunk] | ||
val offset = carbonDataFiles.length | ||
val converter = new DataFileFooterConverterV3 | ||
val fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) | ||
val actualOffset = fileReader.readLong(carbonDataFiles.getAbsolutePath, offset - 8) | ||
val blockInfo = new TableBlockInfo(carbonDataFiles.getAbsolutePath, | ||
actualOffset, | ||
"0", | ||
new Array[String](0), | ||
carbonDataFiles.length, | ||
ColumnarFormatVersion.V3, | ||
null) | ||
val dataFileFooter = converter.readDataFileFooter(blockInfo) | ||
val blockletList = dataFileFooter.getBlockletList | ||
import scala.collection.JavaConversions._ | ||
for (blockletInfo <- blockletList) { | ||
val dimensionColumnChunkReader = | ||
CarbonDataReaderFactory | ||
.getInstance | ||
.getDimensionColumnChunkReader(ColumnarFormatVersion.V3, | ||
blockletInfo, | ||
dataFileFooter.getSegmentInfo.getColumnCardinality, | ||
carbonDataFiles.getAbsolutePath, | ||
false).asInstanceOf[CompressedDimensionChunkFileBasedReaderV3] | ||
dimensionRawColumnChunks | ||
.add(dimensionColumnChunkReader.readRawDimensionChunk(fileReader, blockIndex)) | ||
} | ||
dimensionRawColumnChunks | ||
} | ||
|
||
private def validateDictionary(rawColumnPage: DimensionRawColumnChunk, | ||
data: Array[String]): Boolean = { | ||
val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary | ||
if (null != local_dictionary) { | ||
val encodings = local_dictionary.getDictionary_meta.encoders | ||
val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta | ||
val encodingFactory = DefaultEncodingFactory.getInstance | ||
val decoder = encodingFactory.createDecoder(encodings, encoderMetas) | ||
val dictionaryPage = decoder | ||
.decode(local_dictionary.getDictionary_data, 0, local_dictionary.getDictionary_data.length) | ||
val dictionaryMap = new | ||
util.HashMap[DictionaryByteArrayWrapper, Integer] | ||
val usedDictionaryValues = util.BitSet | ||
.valueOf(CompressorFactory.getInstance.getCompressor | ||
.unCompressByte(local_dictionary.getDictionary_values)) | ||
var index = 0 | ||
var i = usedDictionaryValues.nextSetBit(0) | ||
while ( { i >= 0 }) { | ||
dictionaryMap | ||
.put(new DictionaryByteArrayWrapper(dictionaryPage.getBytes({ index += 1; index - 1 })), | ||
i) | ||
i = usedDictionaryValues.nextSetBit(i + 1) | ||
} | ||
for (i <- data.indices) { | ||
if (null == dictionaryMap.get(new DictionaryByteArrayWrapper(data(i).getBytes))) { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
false | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters