Skip to content

Commit

Permalink
[KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory s…
Browse files Browse the repository at this point in the history
…erializable

### _Why are the changes needed?_

[SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter in Apache Spark 3.3.2, but it breaks a serializable issue, JobId is non-serializable.

And this pr aims to rewrite `FileWriterFactory` to circumvent the problem

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4359 from Yikf/FileWriterFactory.

Closes #4359

dd8c90f [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
1e5164e [Yikf] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory

Lead-authored-by: Yikf <yikaifei@apache.org>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
yikf and pan3793 committed Feb 18, 2023
1 parent e60855f commit 4feb83d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.hive.write

import java.util.Date

import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWriter, SingleDirectoryDataWriter, WriteJobDescription}
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.sparkHadoopWriterUtils

/**
* This class is rewritten because of SPARK-42478, which affects Spark 3.3.2
*/
case class FileWriterFactory(
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {

private val jobTrackerId = sparkHadoopWriterUtils.createJobTrackerID(new Date)

override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
committer.setupTask(taskAttemptContext)
if (description.partitionColumns.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
}
}

private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = {
val jobId = createJobID(jobTrackerId, 0)
val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
val taskAttemptId = new TaskAttemptID(taskId, 0)
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
hadoopConf.set("mapreduce.job.id", jobId.toString)
hadoopConf.set("mapreduce.task.id", taskId.toString)
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}

/**
* Create a job ID.
*
* @param jobTrackerID unique job track id
* @param id job number
* @return a job ID
*/
def createJobID(jobTrackerID: String, id: Int): JobID = {
if (id < 0) {
throw new IllegalArgumentException("Job number is negative")
}
new JobID(jobTrackerID, id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.WriteTaskResult
import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
import org.apache.spark.sql.types.StringType
Expand All @@ -47,10 +48,12 @@ class HiveBatchWrite(
ifPartitionNotExists: Boolean,
hadoopConf: Configuration,
fileBatchWrite: FileBatchWrite,
externalCatalog: ExternalCatalog) extends BatchWrite with Logging {
externalCatalog: ExternalCatalog,
description: WriteJobDescription,
committer: FileCommitProtocol) extends BatchWrite with Logging {

override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
fileBatchWrite.createBatchWriterFactory(info)
FileWriterFactory(description, committer)
}

override def commit(messages: Array[WriterCommitMessage]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ case class HiveWrite(
ifPartitionNotExists,
hadoopConf,
new FileBatchWrite(job, description, committer),
externalCatalog)
externalCatalog,
description,
committer)
}

private def createWriteJobDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.kyuubi.connector
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
Expand All @@ -43,6 +44,7 @@ object HiveBridgeHelper {
val hive = org.apache.spark.sql.hive.client.hive
val logicalExpressions: LogicalExpressions.type = LogicalExpressions
val hiveClientImpl: HiveClientImpl.type = HiveClientImpl
val sparkHadoopWriterUtils: SparkHadoopWriterUtils.type = SparkHadoopWriterUtils
val catalogV2Util: CatalogV2Util.type = CatalogV2Util
val hiveTableUtil: HiveTableUtil.type = HiveTableUtil
val hiveShim: HiveShim.type = HiveShim
Expand Down

0 comments on commit 4feb83d

Please sign in to comment.