Skip to content

Commit

Permalink
[CARBONDATA-3106] WrittenbyAPI not serialized in executor with global…
Browse files Browse the repository at this point in the history
…sort

Problem:
Written_By_APPNAME when added in carbonproperty is not serialized in executor with global sort

Solution:
Add Written_by_APPNAME in hadoop conf and in executor side get it from configuration and add to carbonproperty

This closes #2928
  • Loading branch information
Indhumathi27 authored and ravipesala committed Nov 21, 2018
1 parent 6df965b commit 2e0153b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
Expand Up @@ -66,9 +66,8 @@ object DataLoadProcessBuilderOnSpark {
val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.appName)
hadoopConf
.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)

val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
// 1. Input
Expand Down
Expand Up @@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
Expand Down Expand Up @@ -228,6 +229,9 @@ object DataLoadProcessorStepOnSpark {
modelBroadcast: Broadcast[CarbonLoadModel],
rowCounter: Accumulator[Int],
conf: Configuration) {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME))
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
var model: CarbonLoadModel = null
var tableName: String = null
Expand Down
Expand Up @@ -104,9 +104,13 @@ protected void writeFooterToFile() throws CarbonDataWriterException {
.convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
thriftColumnSchemaList.size());
convertFileMeta.setIs_sort(isSorted);
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME));
String appName = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
if (appName == null) {
throw new CarbonDataWriterException(
"DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null");
}
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, appName);
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION,
CarbonVersionConstants.CARBONDATA_VERSION);
// fill the carbon index details
Expand Down

0 comments on commit 2e0153b

Please sign in to comment.