From 1b256608a4acb410adbd77196a2c65dfcd0b1d27 Mon Sep 17 00:00:00 2001 From: akashrn5 Date: Fri, 26 Oct 2018 19:34:04 +0530 Subject: [PATCH] handle passing spark appname for partition table and file format --- .../hadoop/api/CarbonTableOutputFormat.java | 6 ++++++ .../hadoop/ft/CarbonTableInputFormatTest.java | 2 -- .../hadoop/ft/CarbonTableOutputFormatTest.java | 2 -- .../datasources/SparkCarbonFileFormat.scala | 8 +++----- .../datasources/SparkCarbonTableFormat.scala | 13 +++++++++---- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index f0f285825fc..0bcd7e19b30 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -236,6 +236,12 @@ public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext contex public RecordWriter getRecordWriter( final TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + String appName = + taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME); + if (null != appName) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName); + } //if loadModel having taskNo already(like in SDK) then no need to overwrite short sdkWriterCores = loadModel.getSdkWriterCores(); int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1; diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java index d379d330f43..136d3cc774a 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java @@ -62,8 +62,6 @@ public class CarbonTableInputFormatTest { addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/"); - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "CarbonTableInputFormatTest"); try { creator = new StoreCreator(new File("target/store").getAbsolutePath(), new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()); diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java index 6fb72524794..379fdaf027e 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java @@ -53,8 +53,6 @@ public class CarbonTableOutputFormatTest { addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/"); - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "CarbonTableOutputFormatTest"); try { carbonLoadModel = new StoreCreator(new File("target/store").getAbsolutePath(), new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createTableAndLoadModel(); diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index 719fa34f70f..ca5a87a9b84 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -121,15 +121,13 @@ class SparkCarbonFileFormat extends FileFormat dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - + conf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + sparkSession.sqlContext.getConf("spark.app.name")) val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema) model.setLoadWithoutConverterStep(true) CarbonTableOutputFormat.setLoadModel(conf, model) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.getConf.get("spark.app.name")) - new OutputWriterFactory { override def newInstance( path: String, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 47d6a716119..a3501aa4ee4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -121,10 +121,6 @@ with Serializable { model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) model.setLoadWithoutConverterStep(true) - carbonProperty - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.getConf.get("spark.app.name")) - val staticPartition = options.getOrElse("staticpartition", null) if (staticPartition != null) { conf.set("carbon.staticpartition", staticPartition) @@ -159,6 +155,9 @@ with Serializable { if (updateTimeStamp.isDefined) { conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) } + conf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + sparkSession.sqlContext.getConf("spark.app.name")) CarbonTableOutputFormat.setLoadModel(conf, model) new OutputWriterFactory { @@ -175,6 +174,12 @@ with Serializable { dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) + val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME) + if (null != appName) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + appName) + } val taskNumber = generateTaskNumber(path, context, model.getSegmentId) val storeLocation = CommonUtil.getTempStoreLocations(taskNumber) CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)