diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index b17969bab85..d5c3c849266 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -191,7 +191,12 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String], if (isUpdateTableStatusRequired) { CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } - throw ex + val errorMessage = operationContext.getProperty("Error message") + if (errorMessage != null) { + throw new RuntimeException(errorMessage.toString, ex.getCause) + } else { + throw ex + } } Seq.empty } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala index f574e127f2c..5c461272c76 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala @@ -1064,6 +1064,7 @@ object CommonLoadUtils { if (loadParams.updateModel.isDefined) { CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get, executorMessage) } + loadParams.operationContext.setProperty("Error message", errorMessage) LOGGER.info(errorMessage) LOGGER.error(ex) throw ex diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala index c19c51e83c5..488291dc247 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala @@ -219,6 +219,25 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter } } + test("test load with partition column having bad record value") { + sql("drop table if exists dataloadOptionTests") + sql("CREATE TABLE dataloadOptionTests (empno int, empname String, designation String, " + + "workgroupcategory int, workgroupcategoryname String, deptno int, projectjoindate " + + "Timestamp, projectenddate Date,attendance int,utilization int,salary int) PARTITIONED BY " + + "(deptname String,doj Timestamp,projectcode int) STORED AS carbondata ") + val csvFilePath = s"$resourcesPath/data.csv" + val ex = intercept[RuntimeException] { + sql("LOAD DATA local inpath '" + csvFilePath + + "' INTO TABLE dataloadOptionTests OPTIONS ('bad_records_action'='FAIL', 'DELIMITER'= '," + + "', 'QUOTECHAR'= '\"', 'dateformat'='DD-MM-YYYY','timestampformat'='DD-MM-YYYY')") + } + assert(ex.getMessage.contains( + "DataLoad failure: Data load failed due to bad record: The value with column name " + + "projectjoindate and column data type TIMESTAMP is not a valid TIMESTAMP type.Please " + + "enable bad record logger to know the detail reason.")) + sql("drop table dataloadOptionTests") + } + def drop(): Unit = { sql("drop table IF EXISTS sales") sql("drop table IF EXISTS serializable_values")