Skip to content

Commit

Permalink
Support column compressor in table properties
Browse files Browse the repository at this point in the history
Support specifying column compressor while creating table in table
properties.
  • Loading branch information
xuchuanyin committed Sep 11, 2018
1 parent 11fa3bb commit 308319e
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 25 deletions.
Expand Up @@ -27,7 +27,6 @@
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.Encoding;

Expand Down Expand Up @@ -65,7 +64,8 @@ public DecoderBasedFallbackEncoder(EncodedColumnPage encodedColumnPage, int page
int[] rlePage;

// uncompress the encoded column page
byte[] bytes = CompressorFactory.getInstance().getCompressor()
byte[] bytes = CompressorFactory.getInstance().getCompressor(
encodedColumnPage.getActualPage().getColumnPageEncoderMeta().getCompressorName())
.unCompressByte(encodedColumnPage.getEncodedData().array(), offset,
encodedColumnPage.getPageMetadata().data_page_length);

Expand Down Expand Up @@ -94,12 +94,6 @@ public DecoderBasedFallbackEncoder(EncodedColumnPage encodedColumnPage, int page
// disable encoding using local dictionary
encodedColumnPage.getActualPage().disableLocalDictEncoding();

// get column spec for existing column page
TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();

// get the dataType of column
DataType dataType = encodedColumnPage.getActualPage().getDataType();

// create a new column page which will have actual data instead of encoded data
ColumnPage actualDataColumnPage =
ColumnPage.newPage(encodedColumnPage.getActualPage().getColumnPageEncoderMeta(),
Expand All @@ -121,6 +115,8 @@ public DecoderBasedFallbackEncoder(EncodedColumnPage encodedColumnPage, int page
.putBytes(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(keyArray));
}

// get column spec for existing column page
TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
FallbackEncodedColumnPage fallBackEncodedColumnPage =
CarbonUtil.getFallBackEncodedColumnPage(actualDataColumnPage, pageIndex, columnSpec);
// here freeing the memory of new column page created as fallback is done and
Expand Down
Expand Up @@ -294,11 +294,16 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
}
model = new CarbonLoadModel();
CarbonProperties carbonProperty = CarbonProperties.getInstance();
model.setColumnCompressor(CompressorFactory.getInstance().getCompressor().getName());
model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
model.setTableName(CarbonTableOutputFormat.getTableName(conf));
model.setCarbonTransactionalTable(true);
CarbonTable carbonTable = getCarbonTable(conf);
String columnCompressor = carbonTable.getTableInfo().getFactTable().getTableProperties().get(
CarbonCommonConstants.COMPRESSOR);
if (null == columnCompressor) {
columnCompressor = CompressorFactory.getInstance().getCompressor().getName();
}
model.setColumnCompressor(columnCompressor);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
model.setTablePath(getTablePath(conf));
setFileHeader(conf, model);
Expand Down
Expand Up @@ -135,7 +135,12 @@ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String fac
AbsoluteTableIdentifier absoluteTableIdentifier) {
CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
CarbonLoadModel loadModel = new CarbonLoadModel();
loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor().getName());
String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get(
CarbonCommonConstants.COMPRESSOR);
if (columnCompressor == null) {
columnCompressor = CompressorFactory.getInstance().getCompressor().getName();
}
loadModel.setColumnCompressor(columnCompressor);
loadModel.setCarbonDataLoadSchema(schema);
loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
Expand Down
Expand Up @@ -84,7 +84,11 @@ object CarbonDataStoreCreator {
writeDictionary(dataFilePath, table, absoluteTableIdentifier)
val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
val loadModel: CarbonLoadModel = new CarbonLoadModel()
loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor().getName());
import scala.collection.JavaConverters._
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor().getName())
loadModel.setColumnCompressor(columnCompressor)
loadModel.setCarbonDataLoadSchema(schema)
loadModel.setDatabaseName(
absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
Expand Down
Expand Up @@ -24,14 +24,15 @@ import java.util.Calendar

import scala.util.Random

import org.apache.commons.lang3.RandomStringUtils
import org.apache.commons.lang3.{RandomStringUtils, StringUtils}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser
Expand Down Expand Up @@ -77,7 +78,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
}
}

private def createTable(streaming: Boolean = false): Unit = {
private def createTable(streaming: Boolean = false, columnCompressor: String = ""): Unit = {
sql(s"DROP TABLE IF EXISTS $tableName")
sql(
s"""
Expand All @@ -100,6 +101,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| ${if (StringUtils.isBlank(columnCompressor)) "" else s"'${CarbonCommonConstants.COMPRESSOR}'='$columnCompressor',"}
| ${if (streaming) "" else s"'LONG_STRING_COLUMNS'='longStringField',"}
| 'SORT_COLUMNS'='stringSortField',
| 'DICTIONARY_INCLUDE'='stringDictField',
Expand Down Expand Up @@ -297,6 +299,30 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
}

test("test creating table with specified compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
// create table with zstd as compressor
createTable(columnCompressor = "zstd")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
assert("zstd".equalsIgnoreCase(tableColumnCompressor))
}

test("test creating table with unsupported compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
// create table with unsupported compressor
val exception = intercept[InvalidConfigurationException] {
createTable (columnCompressor = "fakecompressor")
}
assert(exception.getMessage.contains("fakecompressor compressor is not supported"))
}

private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,
saveMode: SaveMode = SaveMode.Overwrite): Unit = {
val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
Expand Down
Expand Up @@ -272,7 +272,10 @@ object StreamSinkFactory {
getConf.get("spark.driver.host")
carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt)
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
carbonLoadModel
}
}
Expand Up @@ -289,7 +289,10 @@ object CarbonDataRDDFactory {
loadModel.readAndSetLoadMetadataDetails()
val loadStartTime = CarbonUpdateUtil.readCurrentTime()
loadModel.setFactTimeStamp(loadStartTime)
loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
loadModel.setColumnCompressor(columnCompressor)
loadModel
}

Expand Down
Expand Up @@ -160,7 +160,10 @@ case class CarbonAlterTableCompactionCommand(
carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(table.getTablePath)
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)

var storeLocation = System.getProperty("java.io.tmpdir")
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
Expand Down
Expand Up @@ -207,7 +207,10 @@ case class CarbonLoadDataCommand(
carbonLoadModel.setAggLoadRequest(
internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)

val javaPartition = mutable.Map[String, String]()
partition.foreach { case (k, v) =>
Expand Down
Expand Up @@ -124,7 +124,10 @@ case class CarbonAlterTableAddHivePartitionCommand(
"Schema of index files located in location is not matching with current table schema")
}
val loadModel = new CarbonLoadModel
loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
loadModel.setColumnCompressor(columnCompressor)
loadModel.setCarbonTransactionalTable(true)
loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
// Create new entry in tablestatus file
Expand Down
Expand Up @@ -146,7 +146,10 @@ case class CarbonAlterTableDropPartitionCommand(
carbonLoadModel.setTablePath(table.getTablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
alterTableDropPartition(
sparkSession.sqlContext,
model.partitionId,
Expand Down
Expand Up @@ -142,9 +142,12 @@ case class CarbonAlterTableSplitPartitionCommand(
LockUsage.ALTER_PARTITION_LOCK)
locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
locksToBeAcquired)(sparkSession)
val carbonLoadModel = new CarbonLoadModel()
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonLoadModel = new CarbonLoadModel()
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
val tablePath = table.getTablePath
val dataLoadSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
Expand Down
Expand Up @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.command.table

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.MetadataCommand

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
Expand Down Expand Up @@ -99,6 +101,18 @@ case class CarbonCreateTableCommand(
throwMetadataException(dbName, tableName, "Table should have at least one column.")
}

// Add validatation for column compressor when create table
val columnCompressor = tableInfo.getFactTable.getTableProperties.get(
CarbonCommonConstants.COMPRESSOR)
try {
if (null != columnCompressor) {
CompressorFactory.getInstance().getCompressor(columnCompressor)
}
} catch {
case ex : UnsupportedOperationException =>
throw new InvalidConfigurationException(ex.getMessage)
}

val operationContext = new OperationContext
val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo))
Expand Down
Expand Up @@ -88,7 +88,10 @@ with Serializable {
val table = CarbonEnv.getCarbonTable(
TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
val model = new CarbonLoadModel
model.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
model.setColumnCompressor(columnCompressor)

val carbonProperty = CarbonProperties.getInstance()
val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
Expand Down
Expand Up @@ -71,7 +71,11 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
}
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
import scala.collection.JavaConverters._
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
carbonLoadModel
}

Expand Down
Expand Up @@ -186,7 +186,11 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
}
carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName)
import scala.collection.JavaConverters._
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
carbonLoadModel
}

Expand Down
Expand Up @@ -27,6 +27,7 @@

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
Expand Down Expand Up @@ -177,7 +178,11 @@ private void initializeAtFirstRow() throws IOException, InterruptedException {
} else {
// IF the file is not existed, use the create api
outputStream = FileFactory.getDataOutputStream(filePath, fileType);
compressorName = CompressorFactory.getInstance().getCompressor().getName();
compressorName = carbonTable.getTableInfo().getFactTable().getTableProperties().get(
CarbonCommonConstants.COMPRESSOR);
if (null == compressorName) {
compressorName = CompressorFactory.getInstance().getCompressor().getName();
}
writeFileHeader();
}

Expand Down

0 comments on commit 308319e

Please sign in to comment.