Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10330] Add Scalastyle rule to require use of SparkHadoopUtil JobContext methods #8521

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen I didn't change job.getConfiguration last time because they uses org.apache.hadoop.mapreduce.Job whose getConfiguration is compatible. However, +1 for this change since it is good for maintenance.

new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
Expand Down Expand Up @@ -910,7 +910,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
Expand Down Expand Up @@ -1092,7 +1092,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = job.getConfiguration
val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ class SparkHadoopUtil extends Logging {
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
// scalastyle:off jobconfig
val method = context.getClass.getMethod("getConfiguration")
// scalastyle:on jobconfig
method.invoke(context).asInstanceOf[Configuration]
}

Expand All @@ -204,7 +206,9 @@ class SparkHadoopUtil extends Logging {
*/
def getTaskAttemptIDFromTaskAttemptContext(
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
// scalastyle:off jobconfig
val method = context.getClass.getMethod("getTaskAttemptID")
// scalastyle:on jobconfig
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(job.getConfiguration)
val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
jobConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}

/**
Expand Down Expand Up @@ -1002,7 +1003,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (isDriverSide) {
initDriverSideJobFuncOpt.map(f => f(job))
}
job.getConfiguration
SparkHadoopUtil.get.getConfigurationFromJobContext(job)
}

private val jobTrackerId: String = {
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{File, FileWriter}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.PortableDataStream
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -506,8 +507,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
job.getConfiguration.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

// scalastyle:off println
// scalastyle:off jobcontext
package org.apache.spark.examples

import java.nio.ByteBuffer
Expand Down Expand Up @@ -81,6 +82,7 @@ object CassandraCQLTest {

val job = new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
val configuration = job.getConfiguration
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
Expand Down Expand Up @@ -135,3 +137,4 @@ object CassandraCQLTest {
}
}
// scalastyle:on println
// scalastyle:on jobcontext
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

// scalastyle:off println
// scalastyle:off jobcontext
package org.apache.spark.examples

import java.nio.ByteBuffer
Expand Down Expand Up @@ -130,6 +131,7 @@ object CassandraTest {
}
}
// scalastyle:on println
// scalastyle:on jobcontext

/*
create keyspace casDemo;
Expand Down
8 changes: 8 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ This file is divided into 3 sections:
scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
</check>

<!-- As of SPARK-10330 JobContext methods should not be called directly -->
<check customId="jobcontext" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters><parameter name="regex">^getConfiguration$|^getTaskAttemptID$</parameter></parameters>
<customMessage>Instead of calling .getConfiguration() or .getTaskAttemptID() directly,
use SparkHadoopUtil's getConfigurationFromJobContext() and getTaskAttemptIDFromTaskAttemptContext() methods.
</customMessage>
</check>

<!-- ================================================================================ -->
<!-- rules we'd like to enforce, but haven't cleaned up the codebase yet -->
<!-- ================================================================================ -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private[sql] abstract class BaseWriterContainer(

protected val dataSchema = relation.dataSchema

protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
protected val serializableConf =
new SerializableConfiguration(SparkHadoopUtil.get.getConfigurationFromJobContext(job))

// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
Expand Down Expand Up @@ -89,7 +90,8 @@ private[sql] abstract class BaseWriterContainer(
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
SparkHadoopUtil.get.getConfigurationFromJobContext(job).
set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)

// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
Expand Down Expand Up @@ -182,7 +184,9 @@ private[sql] abstract class BaseWriterContainer(
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
// scalastyle:off jobcontext
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
// scalastyle:on jobcontext
}

private def setupConf(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[sql] class JSONRelation(

private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)

val paths = inputPaths.map(_.getPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with

// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration
val conf = {
// scalastyle:off jobcontext
context.getConfiguration
// scalastyle:on jobcontext
}

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
override def setupTask(taskContext: TaskAttemptContext): Unit = {}

override def commitJob(jobContext: JobContext) {
val configuration = ContextUtil.getConfiguration(jobContext)
val configuration = {
// scalastyle:off jobcontext
ContextUtil.getConfiguration(jobContext)
// scalastyle:on jobcontext
}
val fileSystem = outputPath.getFileSystem(configuration)

if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ private[sql] class ParquetRelation(
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
val conf = {
// scalastyle:off jobcontext
ContextUtil.getConfiguration(job)
// scalastyle:on jobcontext
}

// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
Expand Down Expand Up @@ -528,7 +532,7 @@ private[sql] object ParquetRelation extends Logging {
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
followParquetFormatSpec: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
Expand Down Expand Up @@ -572,7 +576,7 @@ private[sql] object ParquetRelation extends Logging {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
overrideMinSplitSize(parquetBlockSize, SparkHadoopUtil.get.getConfigurationFromJobContext(job))
}

private[parquet] def readSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ private[parquet] object ParquetTypesConverter extends Logging {
throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
}
val job = new Job()
val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
val conf = {
// scalastyle:off jobcontext
configuration.getOrElse(ContextUtil.getConfiguration(job))
// scalastyle:on jobcontext
}
val fs: FileSystem = origPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[sql] class OrcRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
job.getConfiguration match {
SparkHadoopUtil.get.getConfigurationFromJobContext(job) match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
Expand Down Expand Up @@ -284,7 +284,7 @@ private[orc] case class OrcTableScan(

def execute(): RDD[InternalRow] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)

// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {
Expand Down