Skip to content

Commit

Permalink
updated rename command so that table directory is not renamed
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jun 28, 2018
1 parent afcaecf commit 8e541d6
Show file tree
Hide file tree
Showing 25 changed files with 162 additions and 235 deletions.
12 changes: 0 additions & 12 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
Expand Down Expand Up @@ -354,17 +353,6 @@ public static void deleteFoldersAndFiles(final CarbonFile... file)
});
}

public static String getBadLogPath(String storeLocation) {
String badLogStoreLocation = CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
if (null == badLogStoreLocation) {
badLogStoreLocation =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
}
badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
return badLogStoreLocation;
}

public static void deleteFoldersAndFilesSilent(final CarbonFile... file)
throws IOException, InterruptedException {
UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ public void testToDeleteFolderWithInterruptedException()
assertTrue(!testDir.exists());
}

@Test public void testToGetBadLogPath() throws InterruptedException {
new MockUp<CarbonProperties>() {
@SuppressWarnings("unused") @Mock public String getProperty(String key) {
return "../unibi-solutions/system/carbon/badRecords";
}
};
String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath");
assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath");
}
// @Test public void testToGetBadLogPath() throws InterruptedException {
// new MockUp<CarbonProperties>() {
// @SuppressWarnings("unused") @Mock public String getProperty(String key) {
// return "../unibi-solutions/system/carbon/badRecords";
// }
// };
// String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath");
// assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath");
// }

@Test public void testToDeleteFoldersAndFilesForCarbonFileSilently()
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
model.setTableName(CarbonTableOutputFormat.getTableName(conf));
model.setCarbonTransactionalTable(true);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
CarbonTable carbonTable = getCarbonTable(conf);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
model.setTablePath(getTablePath(conf));

setFileHeader(conf, model);
Expand Down Expand Up @@ -345,14 +346,15 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));

model.setBadRecordsLocation(
conf.get(BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));

String badRecordsPath =
carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
if (badRecordsPath != null) {
model.setBadRecordsLocation(badRecordsPath);
} else {
model.setBadRecordsLocation(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL));
}
model.setUseOnePass(
conf.getBoolean(IS_ONE_PASS_LOAD,
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,31 +129,31 @@ class BadRecordActionTest extends QueryTest {
sql("drop table if exists sales")
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='')""")
val exMessage = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')")
" ',', 'QUOTECHAR'= '\"' ,'timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
assert(exMessage.getMessage.contains("Cannot redirect bad records as bad record location is not provided."))
}

test("test bad record REDIRECT but not having empty location in option should throw exception") {
sql("drop table if exists sales")
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
val badRecordLocation = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
try {
val exMessage = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
assert(exMessage.getMessage.contains("Cannot redirect bad records as bad record location is not provided."))
}
finally {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.carbondata.spark.testsuite.badrecordloger
import java.io.{File, FileFilter}

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.hive.HiveContext
import org.scalatest.BeforeAndAfterAll

Expand Down Expand Up @@ -270,10 +270,9 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
}

def getRedirectCsvPath(dbName: String, tableName: String, segment: String, task: String) = {
var badRecordLocation = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName + "/" + segment + "/" +
task
val badRecordLocation = CarbonEnv
.getCarbonTable(Some(dbName), tableName)(sqlContext.sparkSession).getTableInfo.getFactTable
.getTableProperties.get("bad_records_path") + s"/$segment/$task"
val listFiles = new File(badRecordLocation).listFiles(new FileFilter {
override def accept(pathname: File): Boolean = {
pathname.getPath.endsWith(".csv")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
def buildTestDataWithBadRecordRedirect(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
buildTestData(3, false, options)
buildTestData(3, false, options, List("name"), s"$writerPath/../badRecords")
}

def buildTestDataWithSortColumns(sortColumns: List[String]): Any = {
Expand All @@ -118,7 +118,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
def buildTestData(rows: Int,
persistSchema: Boolean,
options: util.Map[String, String],
sortColumns: List[String]): Any = {
sortColumns: List[String],
badRecordsPath: String = ""): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
Expand All @@ -135,6 +136,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
builder
.sortBy(sortColumns.toArray)
.outputPath(writerPath)
.badRecordsPath(badRecordsPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis)
.buildWriterForCSVInput(Schema.parseJson(schema))
Expand All @@ -143,13 +145,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.sortBy(sortColumns.toArray)
.badRecordsPath(badRecordsPath)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
.buildWriterForCSVInput(Schema.parseJson(schema))
} else {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.sortBy(sortColumns.toArray)
.badRecordsPath(badRecordsPath)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
.buildWriterForCSVInput(Schema.parseJson(schema))
Expand Down Expand Up @@ -286,7 +290,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
var i =0;
while (i<50){
sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
sql (s"""insert into t1 values ("aaaaa", 12, 20)""")
i = i+1;
}
checkAnswer(sql("select count(*) from t1"),Seq(Row(50)))
Expand Down Expand Up @@ -885,7 +889,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
|'$writerPath'""".stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", null, null),
Row("robot1", null, null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter

test("test partition redirect") {
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'""")
s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")

val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
Expand All @@ -39,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
Expand Down Expand Up @@ -694,6 +696,11 @@ class TableNewProcessor(cm: TableModel) {
}
// Add table comment to table properties
tablePropertiesMap.put("comment", cm.tableComment.getOrElse(""))
val badRecordsPath = getBadRecordsPath(tablePropertiesMap,
cm.tableName,
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_records_path", badRecordsPath)
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
Expand Down Expand Up @@ -739,6 +746,21 @@ class TableNewProcessor(cm: TableModel) {
tableInfo
}

private def getBadRecordsPath(tablePropertiesMap: util.HashMap[String, String],
tableName: String,
tableId: String,
databaseName: String): String = {
val badRecordsPath = tablePropertiesMap
.getOrDefault("bad_records_path",
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC))
if (badRecordsPath == null || badRecordsPath.isEmpty) {
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
} else {
badRecordsPath + CarbonCommonConstants.FILE_SEPARATOR + databaseName +
CarbonCommonConstants.FILE_SEPARATOR + s"${tableName}_$tableId"
}
}

/**
* Method to check to get the encoder from parent or not
* @param field column field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ case class CarbonLoadDataCommand(
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))

optionsFinal
.put("bad_record_path",
tableProperties
.getOrDefault("bad_records_path",
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))

val factPath = if (dataFrame.isDefined) {
""
} else {
Expand Down
Loading

0 comments on commit 8e541d6

Please sign in to comment.