From 8e541d624f316a25153b293199990f1e87be2575 Mon Sep 17 00:00:00 2001 From: kunal642 Date: Wed, 27 Jun 2018 12:28:04 +0530 Subject: [PATCH] updated rename command so that table directory is not renamed --- .../carbondata/core/util/CarbonUtil.java | 12 -- .../carbondata/core/util/CarbonUtilTest.java | 18 +-- .../hadoop/api/CarbonTableOutputFormat.java | 20 ++-- .../badrecordloger/BadRecordActionTest.scala | 14 +-- .../badrecordloger/BadRecordLoggerTest.scala | 9 +- .../TestNonTransactionalCarbonTable.scala | 12 +- ...StandardPartitionBadRecordLoggerTest.scala | 4 +- .../command/carbonTableSchemaCommon.scala | 24 +++- .../management/CarbonLoadDataCommand.scala | 8 ++ .../CarbonAlterTableRenameCommand.scala | 103 +----------------- .../table/CarbonCreateTableCommand.scala | 23 +++- .../table/CarbonDropTableCommand.scala | 9 ++ .../datasources/SparkCarbonTableFormat.scala | 7 ++ .../spark/sql/hive/CarbonFileMetastore.scala | 6 +- .../apache/spark/util/AlterTableUtil.scala | 18 +-- .../apache/spark/sql/hive/CarbonSQLConf.scala | 12 -- .../apache/spark/sql/hive/CarbonSqlConf.scala | 12 -- .../BadRecordPathLoadOptionTest.scala | 11 +- .../TestStreamingTableOperation.scala | 4 +- .../vectorreader/AddColumnTestCases.scala | 6 +- .../loading/BadRecordsLoggerProvider.java | 12 +- .../loading/model/CarbonLoadModelBuilder.java | 18 ++- .../processing/loading/model/LoadOption.java | 11 -- .../processing/util/CarbonBadRecordUtil.java | 8 +- .../sdk/file/CarbonWriterBuilder.java | 16 +++ 25 files changed, 162 insertions(+), 235 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 4e2c16fd4b9..84e44e15483 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -29,7 +29,6 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.block.AbstractIndex; @@ -354,17 +353,6 @@ public static void deleteFoldersAndFiles(final CarbonFile... file) }); } - public static String getBadLogPath(String storeLocation) { - String badLogStoreLocation = CarbonProperties.getInstance() - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); - if (null == badLogStoreLocation) { - badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); - } - badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; - return badLogStoreLocation; - } - public static void deleteFoldersAndFilesSilent(final CarbonFile... file) throws IOException, InterruptedException { UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index 5f8d1999247..ac4129268e6 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -206,15 +206,15 @@ public void testToDeleteFolderWithInterruptedException() assertTrue(!testDir.exists()); } - @Test public void testToGetBadLogPath() throws InterruptedException { - new MockUp() { - @SuppressWarnings("unused") @Mock public String getProperty(String key) { - return "../unibi-solutions/system/carbon/badRecords"; - } - }; - String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath"); - assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath"); - } +// @Test public void testToGetBadLogPath() throws InterruptedException { +// new MockUp() { +// @SuppressWarnings("unused") @Mock public String getProperty(String key) { +// return "../unibi-solutions/system/carbon/badRecords"; +// } +// }; +// String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath"); +// assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath"); +// } @Test public void testToDeleteFoldersAndFilesForCarbonFileSilently() throws IOException, InterruptedException { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index e11efc41b5b..232be9e8d0b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -279,7 +279,8 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf)); model.setTableName(CarbonTableOutputFormat.getTableName(conf)); model.setCarbonTransactionalTable(true); - model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf))); + CarbonTable carbonTable = getCarbonTable(conf); + model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)); model.setTablePath(getTablePath(conf)); setFileHeader(conf, model); @@ -345,14 +346,15 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))); - model.setBadRecordsLocation( - conf.get(BAD_RECORD_PATH, - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperty.getProperty( - CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))); - + String badRecordsPath = + carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path"); + if (badRecordsPath != null) { + model.setBadRecordsLocation(badRecordsPath); + } else { + model.setBadRecordsLocation(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)); + } model.setUseOnePass( conf.getBoolean(IS_ONE_PASS_LOAD, Boolean.parseBoolean( diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala index 959aa6a559c..6401526d8c8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala @@ -129,31 +129,31 @@ class BadRecordActionTest extends QueryTest { sql("drop table if exists sales") sql( """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, - actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='')""") val exMessage = intercept[Exception] { sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + "('bad_records_action'='REDIRECT', 'DELIMITER'=" + - " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')") + " ',', 'QUOTECHAR'= '\"' ,'timestampformat'='yyyy/MM/dd')") } - assert(exMessage.getMessage.contains("Invalid bad records location.")) + assert(exMessage.getMessage.contains("Cannot redirect bad records as bad record location is not provided.")) } test("test bad record REDIRECT but not having empty location in option should throw exception") { sql("drop table if exists sales") - sql( - """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, - actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") val badRecordLocation = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL) + sql( + """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") try { val exMessage = intercept[Exception] { sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" + "('bad_records_action'='REDIRECT', 'DELIMITER'=" + " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") } - assert(exMessage.getMessage.contains("Invalid bad records location.")) + assert(exMessage.getMessage.contains("Cannot redirect bad records as bad record location is not provided.")) } finally { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala index b6ba0e075fa..1a162cf8051 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala @@ -20,7 +20,7 @@ package org.apache.carbondata.spark.testsuite.badrecordloger import java.io.{File, FileFilter} import org.apache.commons.io.FileUtils -import org.apache.spark.sql.Row +import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.hive.HiveContext import org.scalatest.BeforeAndAfterAll @@ -270,10 +270,9 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll { } def getRedirectCsvPath(dbName: String, tableName: String, segment: String, task: String) = { - var badRecordLocation = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC) - badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName + "/" + segment + "/" + - task + val badRecordLocation = CarbonEnv + .getCarbonTable(Some(dbName), tableName)(sqlContext.sparkSession).getTableInfo.getFactTable + .getTableProperties.get("bad_records_path") + s"/$segment/$task" val listFiles = new File(badRecordLocation).listFiles(new FileFilter { override def accept(pathname: File): Boolean = { pathname.getPath.endsWith(".csv") diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 7a6a613b6fb..60a8d0236e8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -102,7 +102,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { def buildTestDataWithBadRecordRedirect(): Any = { FileUtils.deleteDirectory(new File(writerPath)) var options = Map("bAd_RECords_action" -> "REDIRECT").asJava - buildTestData(3, false, options) + buildTestData(3, false, options, List("name"), s"$writerPath/../badRecords") } def buildTestDataWithSortColumns(sortColumns: List[String]): Any = { @@ -118,7 +118,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { def buildTestData(rows: Int, persistSchema: Boolean, options: util.Map[String, String], - sortColumns: List[String]): Any = { + sortColumns: List[String], + badRecordsPath: String = ""): Any = { val schema = new StringBuilder() .append("[ \n") .append(" {\"name\":\"string\"},\n") @@ -135,6 +136,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { builder .sortBy(sortColumns.toArray) .outputPath(writerPath) + .badRecordsPath(badRecordsPath) .isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis) .buildWriterForCSVInput(Schema.parseJson(schema)) @@ -143,6 +145,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { builder.outputPath(writerPath) .isTransactionalTable(false) .sortBy(sortColumns.toArray) + .badRecordsPath(badRecordsPath) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) .buildWriterForCSVInput(Schema.parseJson(schema)) @@ -150,6 +153,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { builder.outputPath(writerPath) .isTransactionalTable(false) .sortBy(sortColumns.toArray) + .badRecordsPath(badRecordsPath) .uniqueIdentifier( System.currentTimeMillis).withBlockSize(2) .buildWriterForCSVInput(Schema.parseJson(schema)) @@ -286,7 +290,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'") var i =0; while (i<50){ - sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false) + sql (s"""insert into t1 values ("aaaaa", 12, 20)""") i = i+1; } checkAnswer(sql("select count(*) from t1"),Seq(Row(50))) @@ -885,7 +889,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS sdkOutputTable") sql( s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION - |'$writerPath' """.stripMargin) + |'$writerPath'""".stripMargin) checkAnswer(sql("select * from sdkOutputTable"), Seq( Row("robot0", null, null), Row("robot1", null, null), diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala index 8e1f13b86e9..d9e5d3c5f51 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala @@ -43,8 +43,8 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter test("test partition redirect") { sql( - """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, - actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'""") + s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, + actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""") val csvFilePath = s"$resourcesPath/badrecords/datasample.csv" sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index c77d0df5ecc..18de005f9d0 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} @@ -39,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CompactionType @@ -694,6 +696,11 @@ class TableNewProcessor(cm: TableModel) { } // Add table comment to table properties tablePropertiesMap.put("comment", cm.tableComment.getOrElse("")) + val badRecordsPath = getBadRecordsPath(tablePropertiesMap, + cm.tableName, + tableSchema.getTableId, + cm.databaseNameOp.getOrElse("default")) + tablePropertiesMap.put("bad_records_path", badRecordsPath) tableSchema.setTableProperties(tablePropertiesMap) if (cm.bucketFields.isDefined) { val bucketCols = cm.bucketFields.get.bucketColumns.map { b => @@ -739,6 +746,21 @@ class TableNewProcessor(cm: TableModel) { tableInfo } + private def getBadRecordsPath(tablePropertiesMap: util.HashMap[String, String], + tableName: String, + tableId: String, + databaseName: String): String = { + val badRecordsPath = tablePropertiesMap + .getOrDefault("bad_records_path", + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)) + if (badRecordsPath == null || badRecordsPath.isEmpty) { + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL + } else { + badRecordsPath + CarbonCommonConstants.FILE_SEPARATOR + databaseName + + CarbonCommonConstants.FILE_SEPARATOR + s"${tableName}_$tableId" + } + } + /** * Method to check to get the encoder from parent or not * @param field column field diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 38bdbcfc9e7..30317fdd516 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -182,6 +182,14 @@ case class CarbonLoadDataCommand( carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + optionsFinal + .put("bad_record_path", + tableProperties + .getOrDefault("bad_records_path", + CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))) + val factPath = if (dataFrame.isDefined) { "" } else { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 779b937b1d1..3e4548cd7f0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.command.schema -import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition @@ -27,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.ConcurrentOperationException @@ -36,8 +34,6 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry @@ -126,14 +122,11 @@ private[sql] case class CarbonAlterTableRenameCommand( schemaEvolutionEntry.setTableName(newTableName) timeStamp = System.currentTimeMillis() schemaEvolutionEntry.setTime_stamp(timeStamp) - renameBadRecords(oldTableName, newTableName, oldDatabaseName) val fileType = FileFactory.getFileType(tableMetadataFile) val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, newTableName, carbonTable.getCarbonTableIdentifier.getTableId) val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName)) val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName)) - var newTablePath = CarbonTablePath.getNewTablePath( - oldTableIdentifier.getTablePath, newTableIdentifier.getTableName) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) var partitions: Seq[CatalogTablePartition] = Seq.empty if (carbonTable.isHivePartitionTable) { @@ -144,43 +137,9 @@ private[sql] case class CarbonAlterTableRenameCommand( sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename( oldIdentifier, newIdentifier, - newTablePath) - // changed the rename order to deal with situation when carbon table and hive table - // will point to the same tablePath - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType) - .renameForce( - CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName)) - if (!rename) { - renameBadRecords(newTableName, oldTableName, oldDatabaseName) - sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") - } - } - val updatedParts = updatePartitionLocations( - partitions, - oldTableIdentifier.getTablePath, - newTablePath, - sparkSession, - newIdentifier.table, - oldDatabaseName) + oldTableIdentifier.getTablePath) - val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier) - // Update the storage location with new path - sparkSession.sessionState.catalog.alterTable( - catalogTable.copy(storage = sparkSession.sessionState.catalog. - asInstanceOf[CarbonSessionCatalog].updateStorageLocation( - new Path(newTablePath), - catalogTable.storage, - newIdentifier.table, - oldDatabaseName))) - if (updatedParts.nonEmpty) { - // Update the new updated partitions specs with new location. - sparkSession.sessionState.catalog.alterPartitions( - newIdentifier, - updatedParts) - } - - newTablePath = metastore.updateTableSchemaForAlter( + metastore.updateTableSchemaForAlter( newTableIdentifier, carbonTable.getCarbonTableIdentifier, tableInfo, @@ -190,12 +149,11 @@ private[sql] case class CarbonAlterTableRenameCommand( val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent( carbonTable, alterTableRenameModel, - newTablePath, + oldTableIdentifier.getTablePath, sparkSession) OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext) sparkSession.catalog.refreshTable(newIdentifier.quotedString) - carbonTableLockFilePath = newTablePath LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { @@ -209,7 +167,6 @@ private[sql] case class CarbonAlterTableRenameCommand( carbonTable, timeStamp)( sparkSession) - renameBadRecords(newTableName, oldTableName, oldDatabaseName) } throwMetadataException(oldDatabaseName, oldTableName, s"Alter table rename table operation failed: ${e.getMessage}") @@ -218,62 +175,10 @@ private[sql] case class CarbonAlterTableRenameCommand( if (carbonTable != null) { AlterTableUtil .releaseLocksManually(locks, - locksToBeAcquired, - oldDatabaseName, - newTableName, - carbonTableLockFilePath) + locksToBeAcquired) } } Seq.empty } - /** - * Update partitions with new table location - * - */ - private def updatePartitionLocations( - partitions: Seq[CatalogTablePartition], - oldTablePath: String, - newTablePath: String, - sparkSession: SparkSession, - newTableName: String, - dbName: String): Seq[CatalogTablePartition] = { - partitions.map{ part => - if (part.storage.locationUri.isDefined) { - val path = new Path(part.location) - if (path.toString.contains(oldTablePath)) { - val newPath = new Path(path.toString.replace(oldTablePath, newTablePath)) - part.copy(storage = sparkSession.sessionState.catalog. - asInstanceOf[CarbonSessionCatalog].updateStorageLocation( - newPath, - part.storage, - newTableName, - dbName)) - } else { - part - } - } else { - part - } - } - } - - private def renameBadRecords( - oldTableName: String, - newTableName: String, - dataBaseName: String): Unit = { - val oldPath = CarbonUtil - .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName) - val newPath = CarbonUtil - .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName) - val fileType = FileFactory.getFileType(oldPath) - if (FileFactory.isFileExist(oldPath, fileType)) { - val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType) - .renameForce(newPath) - if (!renameSuccess) { - sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName") - } - } - } - } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 16e99b56c48..63be91e8099 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -26,12 +26,13 @@ import org.apache.spark.sql.execution.command.MetadataCommand import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -41,7 +42,8 @@ case class CarbonCreateTableCommand( tableLocation: Option[String] = None, isExternal : Boolean = false, createDSTable: Boolean = true, - isVisible: Boolean = true) + isVisible: Boolean = true, + badRecordsPath: Option[String] = None) extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { @@ -56,7 +58,7 @@ case class CarbonCreateTableCommand( tableInfo.setDatabaseName(dbName) tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName)) LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tableName]") - + val isTransactionalTable = tableInfo.isTransactionalTable if (sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { if (!ifNotExistsSet) { @@ -66,8 +68,14 @@ case class CarbonCreateTableCommand( throw new TableAlreadyExistsException(dbName, tableName) } } else { - val tablePath = tableLocation.getOrElse( + val path = tableLocation.getOrElse( CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)) + val tablePath = if (FileFactory.getCarbonFile(path).exists() && !isExternal && + !isTransactionalTable) { + path + "_" + tableInfo.getFactTable.getTableId + } else { + path + } tableInfo.setTablePath(tablePath) val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) @@ -90,7 +98,6 @@ case class CarbonCreateTableCommand( OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier) - val isTransactionalTable = tableInfo.isTransactionalTable if (createDSTable) { try { val tablePath = tableIdentifier.getTablePath @@ -118,6 +125,11 @@ case class CarbonCreateTableCommand( } else { "" } + val bad_records_path = if (badRecordsPath.nonEmpty) { + badRecordsPath.get + } else { + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC) + } // isVisible property is added to hive table properties to differentiate between main // table and datamaps(like preaggregate). It is false only for datamaps. This is added // to improve the show tables performance when filtering the datamaps from main tables @@ -132,6 +144,7 @@ case class CarbonCreateTableCommand( | dbName "$dbName", | tablePath "$tablePath", | path "$tablePath", + | badRecordsPath "$bad_records_path", | isExternal "$isExternal", | isTransactional "$isTransactionalTable", | isVisible "$isVisible" diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 776750bb62a..22496bac4d0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.table import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import org.apache.commons.lang.StringUtils import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException @@ -179,6 +180,14 @@ case class CarbonDropTableCommand( val file = FileFactory.getCarbonFile(tablePath, fileType) CarbonUtil.deleteFoldersAndFilesSilent(file) } + // delete bad record path is it exists + val badRecordsLocation = carbonTable.getTableInfo.getFactTable.getTableProperties + .get("bad_records_path") + if (StringUtils.isNotEmpty(badRecordsLocation) && + FileFactory.isFileExist(badRecordsLocation)) { + val file = FileFactory.getCarbonFile(badRecordsLocation) + CarbonUtil.deleteFoldersAndFilesSilent(file) + } if (carbonTable.hasDataMapSchema && childDropCommands.nonEmpty) { // drop all child tables childDropCommands.foreach(_.processData(sparkSession)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 42f1f7780af..d708aab86df 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -92,6 +92,13 @@ with Serializable { carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + optionsFinal + .put("bad_record_path", + tableProperties + .getOrDefault("bad_records_path", + CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))) val partitionStr = table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map( _.getColumnName.toLowerCase).mkString(",") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 81a6bedc1db..2e3e82dba06 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -286,15 +286,13 @@ class CarbonFileMetastore extends CarbonMetaStore { if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } - val newTablePath = CarbonTablePath.getNewTablePath( - identifier.getTablePath, newTableIdentifier.getTableName) val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - newTablePath) + identifier.getTablePath) val newAbsoluteTableIdentifier = AbsoluteTableIdentifier.from( - newTablePath, + identifier.getTablePath, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, oldTableIdentifier.getTableId) diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index c291ae22b0e..da431bbbd9b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -99,19 +99,10 @@ object AlterTableUtil { * * @param locks * @param locksAcquired - * @param dbName - * @param tableName - * @param tablePath */ - def releaseLocksManually(locks: List[ICarbonLock], - locksAcquired: List[String], - dbName: String, - tableName: String, - tablePath: String): Unit = { - val lockLocation = tablePath + def releaseLocksManually(locks: List[ICarbonLock], locksAcquired: List[String]): Unit = { locks.zip(locksAcquired).foreach { case (carbonLock, lockType) => - val lockFilePath = CarbonTablePath.getLockFilePath(lockLocation, lockType) - if (carbonLock.releaseLockManually(lockFilePath)) { + if (carbonLock.unlock()) { LOGGER.info(s"Alter table lock released successfully: ${ lockType }") } else { LOGGER.error("Unable to release lock during alter table operation") @@ -187,7 +178,6 @@ object AlterTableUtil { val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier val database = oldCarbonTable.getDatabaseName val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId) - val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName) val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { @@ -196,10 +186,8 @@ object AlterTableUtil { val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}") - FileFactory.getCarbonFile(tablePath, fileType) - .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName)) val absoluteTableIdentifier = AbsoluteTableIdentifier.from( - newTablePath, + tablePath, newCarbonTableIdentifier) metastore.revertTableSchemaInAlterFailure(oldCarbonTableIdentifier, tableInfo, absoluteTableIdentifier)(sparkSession) diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala index 15ccb0cde9c..6cb9b5e14cd 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala @@ -84,12 +84,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { .doc("Property to enable/disable single_pass.") .booleanConf .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - val BAD_RECORD_PATH = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) - .doc("Property to configure the bad record location.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) val GLOBAL_SORT_PARTITIONS = SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) .doc("Property to configure the global sort partitions.") @@ -134,12 +128,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala index 2128ffd0e77..798641c40e4 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala @@ -83,12 +83,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { .doc("Property to enable/disable single_pass.") .booleanConf .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - val BAD_RECORD_PATH = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) - .doc("Property to configure the bad record location.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) val GLOBAL_SORT_PARTITIONS = buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) .doc("Property to configure the global sort partitions.") @@ -133,12 +127,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala index a59ae679414..986365e4a53 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.carbondata import java.io.File +import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.hive.HiveContext import org.scalatest.BeforeAndAfterAll @@ -44,8 +45,8 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll test("data load log file and csv file written at the configured location") { sql( - """CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String, - actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + s"""CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") val csvFilePath = s"$resourcesPath/badrecords/datasample.csv" @@ -63,9 +64,9 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll } def isFilesWrittenAtBadStoreLocation: Boolean = { - val badStorePath = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC) + - "/default/salestest/0/0" + val badStorePath = + CarbonEnv.getCarbonTable(Some("default"), "salestest")(sqlContext.sparkSession).getTableInfo + .getFactTable.getTableProperties.get("bad_records_path") + "/0/0" val carbonFile: CarbonFile = FileFactory .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath)) var exists: Boolean = carbonFile.exists() diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 9d9a9f525e6..efc1a88bfba 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -2199,7 +2199,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { | ) | STORED BY 'carbondata' | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" } - | 'sort_columns'='name', 'dictionary_include'='city,register') + | 'sort_columns'='name', 'dictionary_include'='city,register', 'BAD_RECORDS_PATH'='$badRecordFilePath') | """.stripMargin) if (withBatchLoad) { @@ -2228,7 +2228,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { | ) | STORED BY 'carbondata' | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" } - | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated') + | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'BAD_RECORDS_PATH'='$badRecordFilePath') | """.stripMargin) if (withBatchLoad) { diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index d94570a0f06..ba42670a23a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -21,13 +21,14 @@ import java.io.{File, FileOutputStream, FileWriter} import java.math.{BigDecimal, RoundingMode} import java.sql.{Date, Timestamp} -import org.apache.spark.sql.Row +import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.test.TestQueryExecutor import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.exception.ProcessMetaDataException @@ -679,6 +680,9 @@ class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { sql("alter table t5 rename to t6") sql("create table t5 (c1 string, c2 int,c3 string) stored by 'carbondata'") sql("insert into t5 select 'asd',1,'sdf'") + val t5: CarbonTable = CarbonEnv.getCarbonTable(None, "t5")(sqlContext.sparkSession) + assert(t5.getTablePath + .contains(t5.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId)) checkAnswer(sql("select * from t5"),Seq(Row("asd",1,"sdf"))) } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java index c2ddff80b90..d55f2ed0a8f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java @@ -23,7 +23,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; /** @@ -74,8 +73,7 @@ public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration String storeLocation = ""; if (configuration.isCarbonTransactionalTable()) { storeLocation = - identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId() + configuration.getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo(); } else { storeLocation = @@ -92,13 +90,7 @@ public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configur String storeLocation) { String badLogStoreLocation = (String) configuration .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); - if (null == badLogStoreLocation) { - badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); - } - badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; - - return badLogStoreLocation; + return badLogStoreLocation + File.separator + storeLocation; } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index a88ce602266..55b5df7f66b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -60,7 +60,7 @@ public CarbonLoadModelBuilder(CarbonTable table) { * @param taskNo * @return a new CarbonLoadModel instance */ - public CarbonLoadModel build(Map options, long UUID, String taskNo) + public CarbonLoadModel build(Map options, long UUID, String taskNo) throws InvalidLoadOptionException, IOException { Map optionsFinal = LoadOption.fillOptionWithDefaultValue(options); @@ -72,6 +72,14 @@ public CarbonLoadModel build(Map options, long UUID, String task } optionsFinal.put("fileheader", Strings.mkString(columns, ",")); } + String badRecordsPath = + table.getTableInfo().getFactTable().getTableProperties().get("bad_record_path"); + if (StringUtils.isEmpty(badRecordsPath)) { + badRecordsPath = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL); + } + optionsFinal.put("bad_record_path", badRecordsPath); CarbonLoadModel model = new CarbonLoadModel(); model.setCarbonTransactionalTable(table.isTransactionalTable()); model.setFactTimeStamp(UUID); @@ -163,10 +171,12 @@ public void build( if (Boolean.parseBoolean(bad_records_logger_enable) || LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { - if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { - throw new InvalidLoadOptionException("Invalid bad records location."); + if (!StringUtils.isEmpty(bad_record_path)) { + bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path); + } else { + throw new InvalidLoadOptionException( + "Cannot redirect bad records as bad record location is not provided."); } - bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path); } carbonLoadModel.setBadRecordsLocation(bad_record_path); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 17c3651b8fa..97338167a08 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -159,17 +159,6 @@ public static Map fillOptionWithDefaultValue( CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))); - optionsFinal.put( - "bad_record_path", - Maps.getOrDefault( - options, - "bad_record_path", - CarbonProperties.getInstance().getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))); - String useOnePass = Maps.getOrDefault( options, "single_pass", diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java index c494eef5c43..e38c9c52ec9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java @@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.loading.BadRecordsLogger; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; @@ -47,14 +46,11 @@ public class CarbonBadRecordUtil { */ public static void renameBadRecord(CarbonDataLoadConfiguration configuration) { // rename the bad record in progress to normal - CarbonTableIdentifier identifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); String storeLocation = ""; if (configuration.isCarbonTransactionalTable()) { storeLocation = - identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId() - + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo(); + configuration.getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration + .getTaskNo(); } else { storeLocation = "SdkWriterBadRecords" + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo(); diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 0f670fe3f55..3579c957ce8 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -32,6 +32,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.converter.SchemaConverter; @@ -44,6 +45,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -67,6 +69,7 @@ public class CarbonWriterBuilder { private long UUID; private Map options; private String taskNo; + private String badRecordsPath = ""; /** * Sets the output path of the writer builder @@ -92,6 +95,13 @@ public CarbonWriterBuilder sortBy(String[] sortColumns) { return this; } + /** + */ + public CarbonWriterBuilder badRecordsPath(String badRecordsPath) { + this.badRecordsPath = badRecordsPath; + return this; + } + /** * sets the taskNo for the writer. SDKs concurrently running * will set taskNo in order to avoid conflicts in file's name during write. @@ -414,6 +424,12 @@ private CarbonTable buildCarbonTable() { tableName = "_tempTable_" + String.valueOf(UUID); } TableSchema schema = tableSchemaBuilder.build(); + if (badRecordsPath.isEmpty()) { + badRecordsPath = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL); + } + schema.getTableProperties().put("bad_records_path", badRecordsPath); schema.setTableName(tableName); CarbonTable table = CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path)