Skip to content

Commit

Permalink
[CARBONDATA-3458] Setting Spark Execution Id to null only for Spark v…
Browse files Browse the repository at this point in the history
…ersion 2.2 and below.

Problem: Spark Execution_ID should not be set to null in Spark 2.3.

Solution: Set EXECUTION_ID to null in spark version 2.2 and below.
In 2.3 version, EXECUTION_ID is to be set by spark code.

This closes #3313
  • Loading branch information
shivamasn authored and kunal642 committed Jul 8, 2019
1 parent c179195 commit 8f0ec97
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.util

import org.apache.spark.{SPARK_VERSION, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY

/*
* this object use to handle file splits
Expand Down Expand Up @@ -57,4 +59,12 @@ object SparkUtil {
isSparkVersionXandAbove(xVersion, true)
}

def setNullExecutionId(sparkSession: SparkSession): Unit = {
// "spark.sql.execution.id is already set" exception will be
// thrown if not set to null in spark2.2 and below versions
if (!SparkUtil.isSparkVersionXandAbove("2.3")) {
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.hive.CarbonRelation
Expand All @@ -46,7 +45,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, SparkUtil}

import org.apache.carbondata.common.Strings
import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -837,7 +836,7 @@ case class CarbonLoadDataCommand(
query = logicalPlan,
overwrite = false,
ifPartitionNotExists = false)
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
SparkUtil.setNullExecutionId(sparkSession)
Dataset.ofRows(sparkSession, convertedPlan)
} catch {
case ex: Throwable =>
Expand Down Expand Up @@ -1056,7 +1055,7 @@ case class CarbonLoadDataCommand(
catalogTable: CatalogTable,
df: DataFrame,
carbonLoadModel: CarbonLoadModel): LogicalPlan = {
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
SparkUtil.setNullExecutionId(sparkSession)
// In case of update, we don't need the segmrntid column in case of partitioning
val dropAttributes = df.logicalPlan.output.dropRight(1)
val finalOutput = catalogTable.schema.map { attr =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
Expand Down Expand Up @@ -122,7 +122,7 @@ case class CarbonCreateTableCommand(
val tablePath = tableIdentifier.getTablePath
val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
SparkUtil.setNullExecutionId(sparkSession)
val partitionInfo = tableInfo.getFactTable.getPartitionInfo
val partitionString =
if (partitionInfo != null &&
Expand Down

0 comments on commit 8f0ec97

Please sign in to comment.