Skip to content

Commit

Permalink
[SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfigura…
Browse files Browse the repository at this point in the history
…tion

## What changes were proposed in this pull request?

In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.

Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21873 from gengliangwang/linterRule.
  • Loading branch information
gengliangwang authored and gatorsmile committed Jul 26, 2018
1 parent 2c82745 commit fa09d91
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 37 deletions.
Expand Up @@ -638,12 +638,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
intercept[FileNotFoundException] {
withTempPath { dir =>
FileUtils.touch(new File(dir, "test"))
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
spark.read.format("avro").load(dir.toString)
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}
}
}
Expand Down Expand Up @@ -717,15 +713,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

Files.createFile(new File(tempSaveDir, "non-avro").toPath)

val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
val newDf = spark.read.format("avro").load(tempSaveDir)
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
assert(newDf.count() == 8)
}
assert(count == 8)
}
}

Expand Down Expand Up @@ -888,20 +879,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Paths.get(new URL(episodesAvro).toURI),
Paths.get(dir.getCanonicalPath, "episodes"))

val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
val hadoopConf = spark.sessionState.newHadoopConf()
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
val newDf = spark
.read
.option("ignoreExtension", "true")
.format("avro")
.load(s"${dir.getCanonicalPath}/episodes")
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
assert(newDf.count() == 8)
}

assert(count == 8)
}
}
}
Expand Up @@ -38,7 +38,9 @@ private object RecursiveFlag {
*/
def withRecursiveFlag[T](value: Boolean, spark: SparkSession)(f: => T): T = {
val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val old = Option(hadoopConf.get(flagName))
hadoopConf.set(flagName, value.toString)
try f finally {
Expand Down Expand Up @@ -98,7 +100,9 @@ private object SamplePathFilter {
val sampleImages = sampleRatio < 1
if (sampleImages) {
val flagName = FileInputFormat.PATHFILTER_CLASS
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val old = Option(hadoopConf.getClass(flagName, null))
hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio)
hadoopConf.setLong(SamplePathFilter.seedParam, seed)
Expand Down
Expand Up @@ -285,7 +285,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
// There should be 1 checkpoint remaining.
assert(model.getCheckpointFiles.length === 1)
val checkpointFile = new Path(model.getCheckpointFiles.head)
val fs = checkpointFile.getFileSystem(spark.sparkContext.hadoopConfiguration)
val fs = checkpointFile.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointFile))
model.deleteCheckpointFiles()
assert(model.getCheckpointFiles.isEmpty)
Expand Down
13 changes: 13 additions & 0 deletions scalastyle-config.xml
Expand Up @@ -150,6 +150,19 @@ This file is divided into 3 sections:
// scalastyle:on println]]></customMessage>
</check>

<check customId="hadoopconfiguration" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">spark(.sqlContext)?.sparkContext.hadoopConfiguration</parameter></parameters>
<customMessage><![CDATA[
Are you sure that you want to use sparkContext.hadoopConfiguration? In most cases, you should use
spark.sessionState.newHadoopConf() instead, so that the hadoop configurations specified in Spark session
configuration will come into effect.
If you must use sparkContext.hadoopConfiguration, wrap the code block with
// scalastyle:off hadoopconfiguration
spark.sparkContext.hadoopConfiguration...
// scalastyle:on hadoopconfiguration
]]></customMessage>
</check>

<check customId="visiblefortesting" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">@VisibleForTesting</parameter></parameters>
<customMessage><![CDATA[
Expand Down
Expand Up @@ -38,7 +38,7 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext {

val lines = ranges.map { case (start, length) =>
val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length)
val hadoopConf = conf.getOrElse(spark.sparkContext.hadoopConfiguration)
val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf())
val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)

reader.map(_.toString)
Expand Down Expand Up @@ -111,20 +111,20 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext {
}

test("io.file.buffer.size is less than line length") {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("io.file.buffer.size", "2")
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5)))
assert(lines == Seq("123456"))
withSQLConf("io.file.buffer.size" -> "2") {
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5)))
assert(lines == Seq("123456"))
}
}
}

test("line cannot be longer than line.maxlength") {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.input.linerecordreader.line.maxlength", "5")
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15)))
assert(lines == Seq("1234"))
withSQLConf("mapreduce.input.linerecordreader.line.maxlength" -> "5") {
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15)))
assert(lines == Seq("1234"))
}
}
}

Expand Down
Expand Up @@ -783,7 +783,7 @@ class HiveDDLSuite
val part1 = Map("a" -> "1", "b" -> "5")
val part2 = Map("a" -> "2", "b" -> "6")
val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
Expand Down
Expand Up @@ -1177,13 +1177,18 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
}

val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
// Turn off style check since the following test is to modify hadoop configuration on purpose.
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration

val originalValue = hadoopConf.get(modeConfKey, "nonstrict")
try {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
hadoopConf.set(modeConfKey, "nonstrict")
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
} finally {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
hadoopConf.set(modeConfKey, originalValue)
}
}
}
Expand Down
Expand Up @@ -2053,7 +2053,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
deleteOnExitField.setAccessible(true)

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]

val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
Expand Down

0 comments on commit fa09d91

Please sign in to comment.