Skip to content

Commit

Permalink
[SPARK-21549][CORE] reverting sql-related changes not previously affe…
Browse files Browse the repository at this point in the history
…cted by the patch
  • Loading branch information
szhem committed Oct 6, 2017
1 parent ff7b084 commit e41abc6
Showing 1 changed file with 0 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.File
import java.io.FilenameFilter

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.test.SharedSQLContext

class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
Expand All @@ -36,123 +30,4 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
assert(partFiles.length === 2)
}
}

test("write should fail when output path is not specified") {
val session = spark
import session.implicits._

val partitionCount = 5
val df = session
.range(100).as("id").repartition(partitionCount)
.withColumn("partition", $"id" % partitionCount)

val plan = df.queryExecution.sparkPlan
val output = null

assertThrows[IllegalArgumentException] {
SQLExecution.withNewExecutionId(session, df.queryExecution) {
FileFormatWriter.write(
sparkSession = session,
plan = plan,
fileFormat = new CSVFileFormat(),
committer = new SQLHadoopMapReduceCommitProtocol("job-1", output),
outputSpec = FileFormatWriter.OutputSpec(output, Map.empty),
hadoopConf = session.sparkContext.hadoopConfiguration,
partitionColumns = plan.outputSet.find(_.name == "partition").toSeq,
bucketSpec = None,
statsTrackers = Seq.empty,
options = Map.empty
)
}
}
}

test("write should succeed when output path is specified") {
withTempPath { path =>
val session = spark
import session.implicits._

val partitionCount = 5
val df = spark
.range(100).as("id").repartition(partitionCount)
.withColumn("partition", $"id" % partitionCount)

val plan = df.queryExecution.sparkPlan
val output = path.getAbsolutePath

SQLExecution.withNewExecutionId(session, df.queryExecution) {
FileFormatWriter.write(
sparkSession = session,
plan = plan,
fileFormat = new CSVFileFormat(),
committer = new SQLHadoopMapReduceCommitProtocol("job-1", output),
outputSpec = FileFormatWriter.OutputSpec(output, Map.empty),
hadoopConf = session.sparkContext.hadoopConfiguration,
partitionColumns = plan.outputSet.find(_.name == "partition").toSeq,
bucketSpec = None,
statsTrackers = Seq.empty,
options = Map.empty
)
}

val partitions = listPartitions(path)
partitions.foreach(partition => assert(partition.listFiles().nonEmpty))

val actualPartitions = partitions.map(_.getName)
val expectedPartitions = (0 until partitionCount).map(partition => s"partition=$partition")
assert(actualPartitions.toSet === expectedPartitions.toSet)
}
}

test("write should succeed when custom partition locations are specified") {
withTempPath { path =>
val session = spark
import session.implicits._

val partitionCount = 5
val df = session
.range(100).as("id").repartition(partitionCount)
.withColumn("partition", $"id" % partitionCount)

val customPartitionLocations = (0 until partitionCount)
.foldLeft(Map[TablePartitionSpec, String]()) { (locations, partition) =>
val partitionSpec = Map("partition" -> partition.toString)
val partitionPath = new File(path, partition.toString).getAbsolutePath
locations + (partitionSpec -> partitionPath)
}

val plan = df.queryExecution.sparkPlan
val output = path.getAbsolutePath

SQLExecution.withNewExecutionId(session, df.queryExecution) {
FileFormatWriter.write(
sparkSession = session,
plan = plan,
fileFormat = new CSVFileFormat(),
committer = new SQLHadoopMapReduceCommitProtocol("job-1", output),
outputSpec = FileFormatWriter.OutputSpec(output, customPartitionLocations),
hadoopConf = session.sparkContext.hadoopConfiguration,
partitionColumns = plan.outputSet.find(_.name == "partition").toSeq,
bucketSpec = None,
statsTrackers = Seq.empty,
options = Map.empty
)
}

val partitions = listPartitions(path)
partitions.foreach(partition => assert(partition.listFiles().nonEmpty))

val actualPartitions = partitions.map(_.getName)
val expectedPartitions = (0 until partitionCount).map(_.toString)
assert(actualPartitions.toSet === expectedPartitions.toSet)
}
}

private def listPartitions(path: File): Array[File] = {
path.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean =
!(name.startsWith(".") || name.startsWith("_"))
})
}

}

0 comments on commit e41abc6

Please sign in to comment.