Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Aug 24, 2015
1 parent 4b6a6f4 commit f3b5d3a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq

protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = {
val fsPath = new Path(path)
val fs = fsPath.getFileSystem(configuration)
val fs = fsPath.getFileSystem(hadoopConfiguration)
val parquetFiles = fs.listStatus(fsPath, new PathFilter {
override def accept(path: Path): Boolean = pathFilter(path)
}).toSeq

val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true)
val footers = ParquetFileReader.readAllFootersInParallel(hadoopConfiguration, parquetFiles, true)
footers.head.getParquetMetadata.getFileMetaData.getSchema
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("compression codec") {
def compressionCodecFor(path: String): String = {
val codecs = ParquetTypesConverter
.readMetaData(new Path(path), Some(configuration))
.readMetaData(new Path(path), Some(hadoopConfiguration))
.getBlocks
.flatMap(_.getColumns)
.map(_.getCodec.name())
Expand Down Expand Up @@ -276,14 +276,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("write metadata") {
withTempPath { file =>
val path = new Path(file.toURI.toString)
val fs = FileSystem.getLocal(configuration)
val fs = FileSystem.getLocal(hadoopConfiguration)
val attributes = ScalaReflection.attributesFor[(Int, String)]
ParquetTypesConverter.writeMetaData(attributes, path, configuration)
ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration)

assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))

val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration))
val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration))
val actualSchema = metaData.getFileMetaData.getSchema
val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)

Expand Down Expand Up @@ -367,62 +367,62 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}

test("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(configuration)
val clonedConf = new Configuration(hadoopConfiguration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
configuration.set("spark.sql.parquet.output.committer.class",
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
classOf[DirectParquetOutputCommitter].getCanonicalName)
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(configuration)
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
hadoopConfiguration.clear()
clonedConf.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}

test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
val clonedConf = new Configuration(configuration)
val clonedConf = new Configuration(hadoopConfiguration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
configuration.set("spark.sql.parquet.output.committer.class",
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(configuration)
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
hadoopConfiguration.clear()
clonedConf.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}


test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(configuration)
val clonedConf = new Configuration(hadoopConfiguration)

configuration.set(
hadoopConfiguration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName)

configuration.set(
hadoopConfiguration.set(
"spark.sql.parquet.output.committer.class",
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)

Expand All @@ -433,8 +433,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
assert(message === "Intentional exception for testing purposes")
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
hadoopConfiguration.clear()
clonedConf.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}
Expand All @@ -452,11 +452,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
val clonedConf = new Configuration(configuration)
val clonedConf = new Configuration(hadoopConfiguration)

// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
configuration.set(
hadoopConfiguration.set(
"spark.sql.parquet.output.committer.class",
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)

Expand All @@ -480,8 +480,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
hadoopConfiguration.clear()
clonedConf.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[sql] trait SQLTestUtils
/**
* The Hadoop configuration used by the active [[SQLContext]].
*/
protected def configuration: Configuration = {
protected def hadoopConfiguration: Configuration = {
sqlContext.sparkContext.hadoopConfiguration
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
val summaryPath = new Path(path, "_metadata")
val commonSummaryPath = new Path(path, "_common_metadata")

val fs = summaryPath.getFileSystem(configuration)
val fs = summaryPath.getFileSystem(hadoopConfiguration)
fs.delete(summaryPath, true)
fs.delete(commonSummaryPath, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,17 +504,17 @@ abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton wit
}

test("SPARK-8578 specified custom output committer will not be used to append data") {
val clonedConf = new Configuration(configuration)
val clonedConf = new Configuration(hadoopConfiguration)
try {
val df = sqlContext.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
configuration.set(
hadoopConfiguration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
configuration.set("spark.sql.parquet.output.committer.class",
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
classOf[AlwaysFailParquetOutputCommitter].getName)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
Expand All @@ -533,12 +533,12 @@ abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton wit
}
}
withTempPath { dir =>
configuration.set(
hadoopConfiguration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
configuration.set("spark.sql.parquet.output.committer.class",
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
classOf[AlwaysFailParquetOutputCommitter].getName)
// Because there is no existing data,
// this append will fail because AlwaysFailOutputCommitter is used when we do append
Expand All @@ -549,8 +549,8 @@ abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton wit
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
hadoopConfiguration.clear()
clonedConf.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}

Expand All @@ -570,7 +570,7 @@ abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton wit
}

test("SPARK-9899 Disable customized output committer when speculation is on") {
val clonedConf = new Configuration(configuration)
val clonedConf = new Configuration(hadoopConfiguration)
val speculationEnabled =
sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)

Expand All @@ -580,7 +580,7 @@ abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton wit
sqlContext.sparkContext.conf.set("spark.speculation", "true")

// Uses a customized output committer which always fails
configuration.set(
hadoopConfiguration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)

Expand All @@ -597,8 +597,8 @@ abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton wit
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
hadoopConfiguration.clear()
clonedConf.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
Expand Down

0 comments on commit f3b5d3a

Please sign in to comment.