Skip to content

Commit

Permalink
[CARBONDATA-3025]handle passing spark appname for partition table and…
Browse files Browse the repository at this point in the history
… file format

Changes in this PR

1.Dataload with partion table file format fails, as the appname is not in carbonproperties in executor.
2.This PR sets the spark appname in carbon properties which will be written to carbondata footer.
3.the appname is set in hadoop conf and then set in carbonproperties in executor from getting the same from the conf
instead of hardcoding the spark property, get from exposed API appName

This closes #2861
  • Loading branch information
akashrn5 authored and jackylk committed Oct 31, 2018
1 parent 94a4f83 commit 6258447
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext contex
public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
final TaskAttemptContext taskAttemptContext) throws IOException {
final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
String appName =
taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
if (null != appName) {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName);
}
//if loadModel having taskNo already(like in SDK) then no need to overwrite
short sdkWriterCores = loadModel.getSdkWriterCores();
int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object DataLoadProcessBuilderOnSpark {

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.getConf.get("spark.app.name"))
sparkSession.sparkContext.appName)

val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
// 1. Input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ abstract class CarbonRDD[T: ClassTag](
@transient private val ss: SparkSession,
@transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {

@transient val sparkAppName: String = ss.sparkContext.appName
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkAppName)

val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
if (info == null || info.getSessionParams == null) {
Expand All @@ -57,11 +61,6 @@ abstract class CarbonRDD[T: ClassTag](

protected def internalGetPartitions: Array[Partition]


CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
ss.sparkContext.getConf.get("spark.app.name"))

override def getPartitions: Array[Partition] = {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
internalGetPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,11 @@ class SparkCarbonFileFormat extends FileFormat
dataSchema: StructType): OutputWriterFactory = {

val conf = job.getConfiguration

conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
model.setLoadWithoutConverterStep(true)
CarbonTableOutputFormat.setLoadModel(conf, model)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.getConf.get("spark.app.name"))

new OutputWriterFactory {
override def newInstance(
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ with Serializable {
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
model.setLoadWithoutConverterStep(true)
carbonProperty
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.getConf.get("spark.app.name"))

val staticPartition = options.getOrElse("staticpartition", null)
if (staticPartition != null) {
conf.set("carbon.staticpartition", staticPartition)
Expand Down Expand Up @@ -159,6 +155,7 @@ with Serializable {
if (updateTimeStamp.isDefined) {
conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
}
conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
CarbonTableOutputFormat.setLoadModel(conf, model)

new OutputWriterFactory {
Expand All @@ -175,6 +172,9 @@ with Serializable {
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName)
val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
Expand Down

0 comments on commit 6258447

Please sign in to comment.