diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java index 7a6bbed3eed..9bed89fce29 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java @@ -27,7 +27,6 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.Encoding; @@ -65,7 +64,8 @@ public DecoderBasedFallbackEncoder(EncodedColumnPage encodedColumnPage, int page int[] rlePage; // uncompress the encoded column page - byte[] bytes = CompressorFactory.getInstance().getCompressor() + byte[] bytes = CompressorFactory.getInstance().getCompressor( + encodedColumnPage.getActualPage().getColumnPageEncoderMeta().getCompressorName()) .unCompressByte(encodedColumnPage.getEncodedData().array(), offset, encodedColumnPage.getPageMetadata().data_page_length); @@ -94,12 +94,6 @@ public DecoderBasedFallbackEncoder(EncodedColumnPage encodedColumnPage, int page // disable encoding using local dictionary encodedColumnPage.getActualPage().disableLocalDictEncoding(); - // get column spec for existing column page - TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec(); - - // get the dataType of column - DataType dataType = encodedColumnPage.getActualPage().getDataType(); - // create a new column page which will have actual data instead of encoded data ColumnPage actualDataColumnPage = ColumnPage.newPage(encodedColumnPage.getActualPage().getColumnPageEncoderMeta(), @@ -121,6 +115,8 @@ public DecoderBasedFallbackEncoder(EncodedColumnPage encodedColumnPage, int page .putBytes(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(keyArray)); } + // get column spec for existing column page + TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec(); FallbackEncodedColumnPage fallBackEncodedColumnPage = CarbonUtil.getFallBackEncodedColumnPage(actualDataColumnPage, pageIndex, columnSpec); // here freeing the memory of new column page created as fallback is done and diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index c89155ba761..bb6f7f3e019 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -294,11 +294,16 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio } model = new CarbonLoadModel(); CarbonProperties carbonProperty = CarbonProperties.getInstance(); - model.setColumnCompressor(CompressorFactory.getInstance().getCompressor().getName()); model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf)); model.setTableName(CarbonTableOutputFormat.getTableName(conf)); model.setCarbonTransactionalTable(true); CarbonTable carbonTable = getCarbonTable(conf); + String columnCompressor = carbonTable.getTableInfo().getFactTable().getTableProperties().get( + CarbonCommonConstants.COMPRESSOR); + if (null == columnCompressor) { + columnCompressor = CompressorFactory.getInstance().getCompressor().getName(); + } + model.setColumnCompressor(columnCompressor); model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)); model.setTablePath(getTablePath(conf)); setFileHeader(conf, model); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index d27d732cba5..7cd241af79b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -135,7 +135,12 @@ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String fac AbsoluteTableIdentifier absoluteTableIdentifier) { CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); CarbonLoadModel loadModel = new CarbonLoadModel(); - loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor().getName()); + String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get( + CarbonCommonConstants.COMPRESSOR); + if (columnCompressor == null) { + columnCompressor = CompressorFactory.getInstance().getCompressor().getName(); + } + loadModel.setColumnCompressor(columnCompressor); loadModel.setCarbonDataLoadSchema(schema); loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index 4be730770b9..4b973a1f1b7 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -84,7 +84,11 @@ object CarbonDataStoreCreator { writeDictionary(dataFilePath, table, absoluteTableIdentifier) val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table) val loadModel: CarbonLoadModel = new CarbonLoadModel() - loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor().getName()); + import scala.collection.JavaConverters._ + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor().getName()) + loadModel.setColumnCompressor(columnCompressor) loadModel.setCarbonDataLoadSchema(schema) loadModel.setDatabaseName( absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala index 66ce6f14d50..628a0dc8385 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala @@ -24,7 +24,7 @@ import java.util.Calendar import scala.util.Random -import org.apache.commons.lang3.RandomStringUtils +import org.apache.commons.lang3.{RandomStringUtils, StringUtils} import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} import org.apache.spark.sql.test.util.QueryTest @@ -32,6 +32,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.streaming.parser.CarbonStreamParser @@ -77,7 +78,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with } } - private def createTable(streaming: Boolean = false): Unit = { + private def createTable(streaming: Boolean = false, columnCompressor: String = ""): Unit = { sql(s"DROP TABLE IF EXISTS $tableName") sql( s""" @@ -100,6 +101,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with | ) | STORED BY 'carbondata' | TBLPROPERTIES( + | ${if (StringUtils.isBlank(columnCompressor)) "" else s"'${CarbonCommonConstants.COMPRESSOR}'='$columnCompressor',"} | ${if (streaming) "" else s"'LONG_STRING_COLUMNS'='longStringField',"} | 'SORT_COLUMNS'='stringSortField', | 'DICTIONARY_INCLUDE'='stringDictField', @@ -297,6 +299,30 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"))) } + test("test creating table with specified compressor") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + // the system configuration for compressor is snappy + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + // create table with zstd as compressor + createTable(columnCompressor = "zstd") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession) + val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR) + assert("zstd".equalsIgnoreCase(tableColumnCompressor)) + } + + test("test creating table with unsupported compressor") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + // the system configuration for compressor is snappy + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + // create table with unsupported compressor + val exception = intercept[InvalidConfigurationException] { + createTable (columnCompressor = "fakecompressor") + } + assert(exception.getMessage.contains("fakecompressor compressor is not supported")) + } + private def generateAllDataTypeFiles(lineNum: Int, csvDir: String, saveMode: SaveMode = SaveMode.Overwrite): Unit = { val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index f1f6006e8d7..b3826935edb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -272,7 +272,10 @@ object StreamSinkFactory { getConf.get("spark.driver.host") carbonLoadModel.setDictionaryServerHost(sparkDriverHost) carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt) - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) carbonLoadModel } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 7a7994ba845..4f504804d29 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -289,7 +289,10 @@ object CarbonDataRDDFactory { loadModel.readAndSetLoadMetadataDetails() val loadStartTime = CarbonUpdateUtil.readCurrentTime() loadModel.setFactTimeStamp(loadStartTime) - loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + loadModel.setColumnCompressor(columnCompressor) loadModel } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 3408c0cca66..a13dfdcd720 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -160,7 +160,10 @@ case class CarbonAlterTableCompactionCommand( carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable) carbonLoadModel.setDatabaseName(table.getDatabaseName) carbonLoadModel.setTablePath(table.getTablePath) - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) var storeLocation = System.getProperty("java.io.tmpdir") storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 45aef731c86..dc7ba2b4b26 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -207,7 +207,10 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) val javaPartition = mutable.Map[String, String]() partition.foreach { case (k, v) => diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index 8d9a2f01b77..6c8b0b0469d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -124,7 +124,10 @@ case class CarbonAlterTableAddHivePartitionCommand( "Schema of index files located in location is not matching with current table schema") } val loadModel = new CarbonLoadModel - loadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + loadModel.setColumnCompressor(columnCompressor) loadModel.setCarbonTransactionalTable(true) loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table)) // Create new entry in tablestatus file diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index f0a5caf2d6d..b76a4853a28 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -146,7 +146,10 @@ case class CarbonAlterTableDropPartitionCommand( carbonLoadModel.setTablePath(table.getTablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime carbonLoadModel.setFactTimeStamp(loadStartTime) - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) alterTableDropPartition( sparkSession.sqlContext, model.partitionId, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index d8630ae2550..753abaf033b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -142,9 +142,12 @@ case class CarbonAlterTableSplitPartitionCommand( LockUsage.ALTER_PARTITION_LOCK) locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - val carbonLoadModel = new CarbonLoadModel() - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + val carbonLoadModel = new CarbonLoadModel() + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) val tablePath = table.getTablePath val dataLoadSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 1beda112f1d..42ea0bda1d0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.table import scala.collection.JavaConverters._ +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY @@ -26,6 +27,7 @@ import org.apache.spark.sql.execution.command.MetadataCommand import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -99,6 +101,18 @@ case class CarbonCreateTableCommand( throwMetadataException(dbName, tableName, "Table should have at least one column.") } + // Add validatation for column compressor when create table + val columnCompressor = tableInfo.getFactTable.getTableProperties.get( + CarbonCommonConstants.COMPRESSOR) + try { + if (null != columnCompressor) { + CompressorFactory.getInstance().getCompressor(columnCompressor) + } + } catch { + case ex : UnsupportedOperationException => + throw new InvalidConfigurationException(ex.getMessage) + } + val operationContext = new OperationContext val createTablePreExecutionEvent: CreateTablePreExecutionEvent = CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 307b953de4b..b605a1dd56d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -88,7 +88,10 @@ with Serializable { val table = CarbonEnv.getCarbonTable( TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) val model = new CarbonLoadModel - model.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + model.setColumnCompressor(columnCompressor) val carbonProperty = CarbonProperties.getInstance() val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index 68b421d07e3..08c149b2191 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -71,7 +71,11 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) } - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + import scala.collection.JavaConverters._ + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) carbonLoadModel } diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index c02c28b207c..060afcacf9b 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -186,7 +186,11 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) } - carbonLoadModel.setColumnCompressor(CompressorFactory.getInstance().getCompressor.getName) + import scala.collection.JavaConverters._ + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) carbonLoadModel } diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index b66542fd373..74789283384 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -177,7 +178,11 @@ private void initializeAtFirstRow() throws IOException, InterruptedException { } else { // IF the file is not existed, use the create api outputStream = FileFactory.getDataOutputStream(filePath, fileType); - compressorName = CompressorFactory.getInstance().getCompressor().getName(); + compressorName = carbonTable.getTableInfo().getFactTable().getTableProperties().get( + CarbonCommonConstants.COMPRESSOR); + if (null == compressorName) { + compressorName = CompressorFactory.getInstance().getCompressor().getName(); + } writeFileHeader(); }