Skip to content

Commit

Permalink
Setting execution_id to null for Spark version 2.2 and below
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamasn committed Jul 2, 2019
1 parent 188e7e4 commit bd0b6f3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.util

import org.apache.spark.{SPARK_VERSION, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY

/*
* this object use to handle file splits
Expand Down Expand Up @@ -57,4 +59,12 @@ object SparkUtil {
isSparkVersionXandAbove(xVersion, true)
}

// "spark.sql.execution.id is already set" exception will be
// thrown if not set to null in spark2.2 and below versions
def setSparkExecutionId(sparkSession: SparkSession, executionId: String): Unit = {
if (!SparkUtil.isSparkVersionXandAbove("2.3")) {
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, executionId)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.hive.CarbonRelation
Expand All @@ -46,7 +45,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, SparkUtil}

import org.apache.carbondata.common.Strings
import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -837,7 +836,7 @@ case class CarbonLoadDataCommand(
query = logicalPlan,
overwrite = false,
ifPartitionNotExists = false)
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
SparkUtil.setSparkExecutionId(sparkSession, null)
Dataset.ofRows(sparkSession, convertedPlan)
} catch {
case ex: Throwable =>
Expand Down Expand Up @@ -1056,7 +1055,7 @@ case class CarbonLoadDataCommand(
catalogTable: CatalogTable,
df: DataFrame,
carbonLoadModel: CarbonLoadModel): LogicalPlan = {
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
SparkUtil.setSparkExecutionId(sparkSession, null)
// In case of update, we don't need the segmrntid column in case of partitioning
val dropAttributes = df.logicalPlan.output.dropRight(1)
val finalOutput = catalogTable.schema.map { attr =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
Expand Down Expand Up @@ -122,7 +122,7 @@ case class CarbonCreateTableCommand(
val tablePath = tableIdentifier.getTablePath
val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
SparkUtil.setSparkExecutionId(sparkSession, null)
val partitionInfo = tableInfo.getFactTable.getPartitionInfo
val partitionString =
if (partitionInfo != null &&
Expand Down

0 comments on commit bd0b6f3

Please sign in to comment.