Skip to content

Commit

Permalink
updated rename command so that table directory is not renamed
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jun 28, 2018
1 parent afcaecf commit 96a46e9
Show file tree
Hide file tree
Showing 21 changed files with 123 additions and 224 deletions.
12 changes: 0 additions & 12 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
Expand Down Expand Up @@ -354,17 +353,6 @@ public static void deleteFoldersAndFiles(final CarbonFile... file)
});
}

public static String getBadLogPath(String storeLocation) {
String badLogStoreLocation = CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
if (null == badLogStoreLocation) {
badLogStoreLocation =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
}
badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
return badLogStoreLocation;
}

public static void deleteFoldersAndFilesSilent(final CarbonFile... file)
throws IOException, InterruptedException {
UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,6 @@ public void testToDeleteFolderWithInterruptedException()
assertTrue(!testDir.exists());
}

@Test public void testToGetBadLogPath() throws InterruptedException {
new MockUp<CarbonProperties>() {
@SuppressWarnings("unused") @Mock public String getProperty(String key) {
return "../unibi-solutions/system/carbon/badRecords";
}
};
String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath");
assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath");
}

@Test public void testToDeleteFoldersAndFilesForCarbonFileSilently()
throws IOException, InterruptedException {
LocalCarbonFile testDir = new LocalCarbonFile("../core/src/test/resources/testDir");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
model.setTableName(CarbonTableOutputFormat.getTableName(conf));
model.setCarbonTransactionalTable(true);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
CarbonTable carbonTable = getCarbonTable(conf);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
model.setTablePath(getTablePath(conf));

setFileHeader(conf, model);
Expand Down Expand Up @@ -345,14 +346,19 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));

model.setBadRecordsLocation(
conf.get(BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));

String badRecordsPath =
carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
if (badRecordsPath != null) {
model.setBadRecordsLocation(badRecordsPath);
} else {
model.setBadRecordsLocation(
conf.get(BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
}
model.setUseOnePass(
conf.getBoolean(IS_ONE_PASS_LOAD,
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,7 @@ class BadRecordActionTest extends QueryTest {
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
}

test("test bad record REDIRECT but not having empty location in option should throw exception") {
sql("drop table if exists sales")
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
val badRecordLocation = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
try {
val exMessage = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
}
finally {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLocation)
}
assert(exMessage.getMessage.contains("Cannot redirect bad records as bad record location is not provided."))
}

test("test bad record is REDIRECT with location in carbon properties should pass") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.carbondata.spark.testsuite.badrecordloger
import java.io.{File, FileFilter}

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.hive.HiveContext
import org.scalatest.BeforeAndAfterAll

Expand Down Expand Up @@ -270,10 +270,9 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
}

def getRedirectCsvPath(dbName: String, tableName: String, segment: String, task: String) = {
var badRecordLocation = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName + "/" + segment + "/" +
task
val badRecordLocation = CarbonEnv
.getCarbonTable(Some(dbName), tableName)(sqlContext.sparkSession).getTableInfo.getFactTable
.getTableProperties.get("bad_records_path") + s"/$segment/$task"
val listFiles = new File(badRecordLocation).listFiles(new FileFilter {
override def accept(pathname: File): Boolean = {
pathname.getPath.endsWith(".csv")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter

test("test partition redirect") {
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'""")
s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")

val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
Expand All @@ -39,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
Expand Down Expand Up @@ -694,6 +696,11 @@ class TableNewProcessor(cm: TableModel) {
}
// Add table comment to table properties
tablePropertiesMap.put("comment", cm.tableComment.getOrElse(""))
val badRecordsPath = getBadRecordsPath(tablePropertiesMap,
cm.tableName,
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_records_path", badRecordsPath)
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
Expand Down Expand Up @@ -739,6 +746,21 @@ class TableNewProcessor(cm: TableModel) {
tableInfo
}

private def getBadRecordsPath(tablePropertiesMap: util.HashMap[String, String],
tableName: String,
tableId: String,
databaseName: String): String = {
val badRecordsPath = tablePropertiesMap
.getOrDefault("bad_records_path",
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC))
if (badRecordsPath == null || badRecordsPath.isEmpty) {
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
} else {
badRecordsPath + CarbonCommonConstants.FILE_SEPARATOR + databaseName +
CarbonCommonConstants.FILE_SEPARATOR + s"${tableName}_$tableId"
}
}

/**
* Method to check to get the encoder from parent or not
* @param field column field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ case class CarbonLoadDataCommand(
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))

optionsFinal
.put("bad_record_path",
options.getOrElse("bad_record_path",
tableProperties
.getOrDefault("bad_records_path",
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))

val factPath = if (dataFrame.isDefined) {
""
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.command.schema

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
Expand All @@ -27,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
Expand All @@ -36,8 +34,6 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry

Expand Down Expand Up @@ -126,14 +122,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
schemaEvolutionEntry.setTableName(newTableName)
timeStamp = System.currentTimeMillis()
schemaEvolutionEntry.setTime_stamp(timeStamp)
renameBadRecords(oldTableName, newTableName, oldDatabaseName)
val fileType = FileFactory.getFileType(tableMetadataFile)
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
var newTablePath = CarbonTablePath.getNewTablePath(
oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
var partitions: Seq[CatalogTablePartition] = Seq.empty
if (carbonTable.isHivePartitionTable) {
Expand All @@ -144,43 +137,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
oldIdentifier,
newIdentifier,
newTablePath)
// changed the rename order to deal with situation when carbon table and hive table
// will point to the same tablePath
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType)
.renameForce(
CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName))
if (!rename) {
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
}
}
val updatedParts = updatePartitionLocations(
partitions,
oldTableIdentifier.getTablePath,
newTablePath,
sparkSession,
newIdentifier.table,
oldDatabaseName)
oldTableIdentifier.getTablePath)

val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
// Update the storage location with new path
sparkSession.sessionState.catalog.alterTable(
catalogTable.copy(storage = sparkSession.sessionState.catalog.
asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
new Path(newTablePath),
catalogTable.storage,
newIdentifier.table,
oldDatabaseName)))
if (updatedParts.nonEmpty) {
// Update the new updated partitions specs with new location.
sparkSession.sessionState.catalog.alterPartitions(
newIdentifier,
updatedParts)
}

newTablePath = metastore.updateTableSchemaForAlter(
metastore.updateTableSchemaForAlter(
newTableIdentifier,
carbonTable.getCarbonTableIdentifier,
tableInfo,
Expand All @@ -190,12 +149,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
carbonTable,
alterTableRenameModel,
newTablePath,
oldTableIdentifier.getTablePath,
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)

sparkSession.catalog.refreshTable(newIdentifier.quotedString)
carbonTableLockFilePath = newTablePath
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
} catch {
Expand All @@ -209,7 +167,6 @@ private[sql] case class CarbonAlterTableRenameCommand(
carbonTable,
timeStamp)(
sparkSession)
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
}
throwMetadataException(oldDatabaseName, oldTableName,
s"Alter table rename table operation failed: ${e.getMessage}")
Expand All @@ -218,62 +175,10 @@ private[sql] case class CarbonAlterTableRenameCommand(
if (carbonTable != null) {
AlterTableUtil
.releaseLocksManually(locks,
locksToBeAcquired,
oldDatabaseName,
newTableName,
carbonTableLockFilePath)
locksToBeAcquired)
}
}
Seq.empty
}

/**
* Update partitions with new table location
*
*/
private def updatePartitionLocations(
partitions: Seq[CatalogTablePartition],
oldTablePath: String,
newTablePath: String,
sparkSession: SparkSession,
newTableName: String,
dbName: String): Seq[CatalogTablePartition] = {
partitions.map{ part =>
if (part.storage.locationUri.isDefined) {
val path = new Path(part.location)
if (path.toString.contains(oldTablePath)) {
val newPath = new Path(path.toString.replace(oldTablePath, newTablePath))
part.copy(storage = sparkSession.sessionState.catalog.
asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
newPath,
part.storage,
newTableName,
dbName))
} else {
part
}
} else {
part
}
}
}

private def renameBadRecords(
oldTableName: String,
newTableName: String,
dataBaseName: String): Unit = {
val oldPath = CarbonUtil
.getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
val newPath = CarbonUtil
.getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
val fileType = FileFactory.getFileType(oldPath)
if (FileFactory.isFileExist(oldPath, fileType)) {
val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
.renameForce(newPath)
if (!renameSuccess) {
sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
}
}
}

}
Loading

0 comments on commit 96a46e9

Please sign in to comment.