Skip to content

Commit

Permalink
[SPARK-14474][SQL] Move FileSource offset log into checkpointLocation
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Now that we have a single location for storing checkpointed state. This PR just propagates the checkpoint location into FileStreamSource so that we don't have one random log off on its own.

## How was this patch tested?

test("metadataPath should be in checkpointLocation")

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12247 from zsxwing/file-source-log-location.
  • Loading branch information
zsxwing authored and marmbrus committed Apr 12, 2016
1 parent da60b34 commit 6bf6921
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 33 deletions.
Expand Up @@ -178,10 +178,13 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
var nextSourceId = 0L
val logicalPlan = df.logicalPlan.transform {
case StreamingRelation(dataSource, _, output) =>
// Materialize source to avoid creating it in every batch
val source = dataSource.createSource()
val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)
Expand Down
Expand Up @@ -123,36 +123,58 @@ case class DataSource(
}
}

/** Returns a source that can be used to continually read data. */
def createSource(): Source = {
private def inferFileFormatSchema(format: FileFormat): StructType = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray

val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")

This comment has been minimized.

Copy link
@jaceklaskowski

jaceklaskowski Apr 15, 2016

Contributor

@zsxwing @marmbrus Two spaces?

}
}

/** Returns the name and schema of the source that can be used to continually read data. */
def sourceSchema(): (String, StructType) = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
s.createSource(sqlContext, userSpecifiedSchema, className, options)
s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)

case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
(s"FileSource[$path]", inferFileFormatSchema(format))
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
}
}

val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
/** Returns a source that can be used to continually read data. */
def createSource(metadataPath: String): Source = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options)

val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
}
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})

val dataSchema = inferFileFormatSchema(format)

def dataFrameBuilder(files: Array[String]): DataFrame = {
Dataset.ofRows(
Expand Down
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.datasources.DataSource

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
val source = dataSource.createSource()
StreamingRelation(dataSource, source.toString, source.schema.toAttributes)
val (name, schema) = dataSource.sourceSchema()
StreamingRelation(dataSource, name, schema.toAttributes)
}
}

Expand Down
Expand Up @@ -129,8 +129,17 @@ trait SchemaRelationProvider {
* Implemented by objects that can produce a streaming [[Source]] for a specific format or system.
*/
trait StreamSourceProvider {

/** Returns the name and schema of the source that can be used to continually read data. */
def sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)

def createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
Expand Down
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit

import scala.concurrent.duration._

import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql._
Expand All @@ -31,22 +32,50 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils

object LastOptions {

var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
var schema: Option[StructType] = null
var partitionColumns: Seq[String] = Nil

def clear(): Unit = {
parameters = null
schema = null
partitionColumns = null
reset(mockStreamSourceProvider)
reset(mockStreamSinkProvider)
}
}

/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
class DefaultSource extends StreamSourceProvider with StreamSinkProvider {

private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)

override def sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
LastOptions.parameters = parameters
LastOptions.schema = schema
LastOptions.mockStreamSourceProvider.sourceSchema(sqlContext, schema, providerName, parameters)
("dummySource", fakeSchema)
}

override def createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
LastOptions.parameters = parameters
LastOptions.schema = schema
LastOptions.mockStreamSourceProvider.createSource(
sqlContext, metadataPath, schema, providerName, parameters)
new Source {
override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
override def schema: StructType = fakeSchema

override def getOffset: Option[Offset] = Some(new LongOffset(0))

Expand All @@ -64,6 +93,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
partitionColumns: Seq[String]): Sink = {
LastOptions.parameters = parameters
LastOptions.partitionColumns = partitionColumns
LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns)
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {}
}
Expand Down Expand Up @@ -117,7 +147,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("opt2") == "2")
assert(LastOptions.parameters("opt3") == "3")

LastOptions.parameters = null
LastOptions.clear()

df.write
.format("org.apache.spark.sql.streaming.test")
Expand Down Expand Up @@ -181,7 +211,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B

assert(LastOptions.parameters("path") == "/test")

LastOptions.parameters = null
LastOptions.clear()

df.write
.format("org.apache.spark.sql.streaming.test")
Expand All @@ -204,7 +234,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("boolOpt") == "false")
assert(LastOptions.parameters("doubleOpt") == "6.7")

LastOptions.parameters = null
LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
.option("intOpt", 56)
Expand Down Expand Up @@ -303,4 +333,39 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B

assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
}

test("source metadataPath") {
LastOptions.clear()

val checkpointLocation = newMetadataDir

val df1 = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
.stream()

val df2 = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
.stream()

val q = df1.union(df2).write
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", checkpointLocation)
.trigger(ProcessingTime(10.seconds))
.startStream()
q.stop()

verify(LastOptions.mockStreamSourceProvider).createSource(
sqlContext,
checkpointLocation + "/sources/0",
None,
"org.apache.spark.sql.streaming.test",
Map.empty)

verify(LastOptions.mockStreamSourceProvider).createSource(
sqlContext,
checkpointLocation + "/sources/1",
None,
"org.apache.spark.sql.streaming.test",
Map.empty)
}
}
Expand Up @@ -63,6 +63,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
format: String,
path: String,
schema: Option[StructType] = None): FileStreamSource = {
val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
val reader =
if (schema.isDefined) {
sqlContext.read.format(format).schema(schema.get)
Expand All @@ -72,7 +73,8 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
reader.stream(path)
.queryExecution.analyzed
.collect { case StreamingRelation(dataSource, _, _) =>
dataSource.createSource().asInstanceOf[FileStreamSource]
// There is only one source in our tests so just set sourceId to 0
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
}.head
}

Expand All @@ -98,9 +100,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
df.queryExecution.analyzed
.collect { case StreamingRelation(dataSource, _, _) =>
dataSource.createSource().asInstanceOf[FileStreamSource]
}.head
.schema
dataSource.sourceSchema()
}.head._2
}

test("FileStreamSource schema: no path") {
Expand Down Expand Up @@ -340,7 +341,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
}

}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
Expand Down
Expand Up @@ -59,7 +59,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
}

test("error if attempting to resume specific checkpoint") {
val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath

val input = MemoryStream[Int]
val query = input.toDF().write
Expand Down
Expand Up @@ -115,8 +115,17 @@ class StreamSuite extends StreamTest with SharedSQLContext {
*/
class FakeDefaultSource extends StreamSourceProvider {

private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)

override def sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)

override def createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
Expand Down

0 comments on commit 6bf6921

Please sign in to comment.