Skip to content

Commit

Permalink
Avoid getting call sites and cleaning closures
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 20, 2015
1 parent 17e2943 commit 10f7e3e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil

import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
import org.apache.spark.util.Utils

private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
Expand Down Expand Up @@ -252,57 +253,58 @@ private[sql] class ParquetRelation2(

val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
// footers. Especially when a global arbitrative schema (either from metastore or data source
// DDL) is available.
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
keyClass = classOf[Void],
valueClass = classOf[Row]) {

val cacheMetadata = useMetadataCache

@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)

new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
}.toSeq

@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}.toSeq

// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses

override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
Utils.withDummyCallSite(sqlContext.sparkContext) {
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
// footers. Especially when a global arbitrative schema (either from metastore or data source
// DDL) is available.
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
keyClass = classOf[Void],
valueClass = classOf[Row]) {

val cacheMetadata = useMetadataCache

@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)

new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
}.toSeq

@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}.toSeq

// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
}
} else {
new FilteringParquetRowInputFormat
}

val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)

Array.tabulate[SparkPartition](rawSplits.size) { i =>
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
Array.tabulate[SparkPartition](rawSplits.size) { i =>
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}
}.values
}.values
}
}

private class MetadataCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
with SparkHadoopMapReduceUtil
with Logging {

if (initLocalJobFuncOpt.isDefined) {
sc.clean(initLocalJobFuncOpt.get)
}

protected def getJob(): Job = {
val conf: Configuration = broadcastedConf.value.value
// "new Job" will make a copy of the conf. Then, it is
Expand Down

0 comments on commit 10f7e3e

Please sign in to comment.