From a949dfc40428178891a099b85c59b1a6e1391041 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Fri, 16 Nov 2018 21:49:16 +0530 Subject: [PATCH] [CARBONDATA-3106] WrittenbyAPI not serialized in executor with globalsort --- .../spark/load/DataLoadProcessBuilderOnSpark.scala | 5 ++--- .../spark/load/DataLoadProcessorStepOnSpark.scala | 6 +++++- .../store/writer/v3/CarbonFactDataWriterImplV3.java | 10 +++++++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 338180dc1a4..8ded6bd1147 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -66,9 +66,8 @@ object DataLoadProcessBuilderOnSpark { val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.appName) + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) // 1. Input diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 0a68fb0ae6c..2ca47b39497 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.util.ThreadLocalSessionInfo +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException @@ -228,6 +229,9 @@ object DataLoadProcessorStepOnSpark { modelBroadcast: Broadcast[CarbonLoadModel], rowCounter: Accumulator[Int], conf: Configuration) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)) ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf) var model: CarbonLoadModel = null var tableName: String = null diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index f16879649ec..ccbc5446dc6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -104,9 +104,13 @@ protected void writeFooterToFile() throws CarbonDataWriterException { .convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality, thriftColumnSchemaList.size()); convertFileMeta.setIs_sort(isSorted); - convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, - CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)); + String appName = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME); + if (appName == null) { + throw new CarbonDataWriterException( + "DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null"); + } + convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, appName); convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION, CarbonVersionConstants.CARBONDATA_VERSION); // fill the carbon index details