Skip to content

Commit

Permalink
Remove FileStreamSink; Rerewrite tests using testStream; fix a bug ab…
Browse files Browse the repository at this point in the history
…out startId; address other comments
  • Loading branch information
zsxwing committed Feb 3, 2016
1 parent a2784ff commit 6a90c55
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {
val resolved = ResolvedDataSource.createSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
providerName = source,
options = extraOptions.toMap)
DataFrame(sqlContext, StreamingRelation(resolved))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ object ResolvedDataSource extends Logging {
def createSource(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
providerName: String,
options: Map[String, String]): Source = {
val provider = lookupDataSource(providerName).newInstance() match {
Expand All @@ -105,20 +104,7 @@ object ResolvedDataSource extends Logging {
s"Data source $providerName does not support streamed reading")
}

userSpecifiedSchema match {
case Some(schema) => {
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
} else {
Some(partitionColumnsSchema(
schema, partitionColumns, sqlContext.conf.caseSensitiveAnalysis))
}
val dataSchema =
StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
provider.createSource(sqlContext, Some(dataSchema), maybePartitionsSchema, options)
}
case None => provider.createSource(sqlContext, None, None, options)
}
provider.createSource(sqlContext, userSpecifiedSchema, options)
}

def createSink(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,21 @@ class FileStreamSource(
new LongOffset(maxBatchFile)
}

def currentOffset: LongOffset = synchronized {
new LongOffset(maxBatchFile)
}

/**
* Returns the next batch of data that is available after `start`, if any is available.
*/
override def getNextBatch(start: Option[Offset]): Option[Batch] = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(0L)
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
val end = fetchMaxOffset()
val endId = end.offset

val batchFiles = (startId to endId).filter(_ >= 0).map(i => s"$metadataPath/$i")
if (!(batchFiles.isEmpty || start == Some(end))) {
logDebug(s"Producing files from batches $start:$endId")
val batchFiles = (startId + 1 to endId).filter(_ >= 0).map(i => s"$metadataPath/$i")
if (batchFiles.nonEmpty) {
logDebug(s"Producing files from batches ${startId + 1}:$endId")
logDebug(s"Batch files: $batchFiles")

// Probably does not need to be a spark job...
Expand All @@ -91,10 +95,6 @@ class FileStreamSource(
}
}

def restart(): FileStreamSource = {
new FileStreamSource(sqlContext, metadataPath, path, dataSchema, dataFrameBuilder)
}

private def sparkContext = sqlContext.sparkContext

private val fs = FileSystem.get(sparkContext.hadoopConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,6 @@ class StreamExecution(
logDebug(s"Waiting for data, current: $streamProgress")
}

/** Clears the indicator that a batch has completed. Used for testing. */
def clearBatchMarker(): Unit = {
batchRun = false
}

/**
* Awaits the completion of at least one streaming batch. Must be called after `clearBatchMarker`
* to guarantee that a new batch has been processed.
*/
def awaitBatchCompletion(): Unit = {
while (!batchRun) {
awaitBatchLock.synchronized {
awaitBatchLock.wait(100)
}
}
}

/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{FileStreamSink, FileStreamSource, Sink, Source}
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -130,8 +130,7 @@ trait SchemaRelationProvider {
trait StreamSourceProvider {
def createSource(
sqlContext: SQLContext,
partitionColumns: Option[StructType],
dataSchema: Option[StructType],
schema: Option[StructType],
parameters: Map[String, String]): Source
}

Expand Down Expand Up @@ -169,7 +168,7 @@ trait StreamSinkProvider {
* @since 1.4.0
*/
@Experimental
trait HadoopFsRelationProvider extends StreamSourceProvider with StreamSinkProvider {
trait HadoopFsRelationProvider extends StreamSourceProvider {
/**
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
Expand Down Expand Up @@ -199,8 +198,7 @@ trait HadoopFsRelationProvider extends StreamSourceProvider with StreamSinkProvi

override def createSource(
sqlContext: SQLContext,
partitionColumns: Option[StructType],
dataSchema: Option[StructType],
schema: Option[StructType],
parameters: Map[String, String]): Source = {
val path = parameters("path")
val metadataPath = parameters.getOrElse("metadataPath", s"$path/_metadata")
Expand All @@ -209,24 +207,14 @@ trait HadoopFsRelationProvider extends StreamSourceProvider with StreamSinkProvi
val relation = createRelation(
sqlContext,
files,
dataSchema,
partitionColumns,
schema,
partitionColumns = None,
bucketSpec = None,
parameters)
DataFrame(sqlContext, LogicalRelation(relation))
}

new FileStreamSource(sqlContext, metadataPath, path, dataSchema, dataFrameBuilder)
}

override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String]): Sink = {
val path = parameters("path")
val metadataPath = parameters.getOrElse("metadataPath", s"$path/_metadata")

new FileStreamSink(sqlContext, metadataPath, path)
new FileStreamSource(sqlContext, metadataPath, path, schema, dataFrameBuilder)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ object LastOptions {
class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
override def createSource(
sqlContext: SQLContext,
partitionColumns: Option[StructType],
dataSchema: Option[StructType],
schema: Option[StructType],
parameters: Map[String, String]): Source = {
LastOptions.parameters = parameters
LastOptions.schema = dataSchema
LastOptions.schema = schema
new Source {
override def getNextBatch(start: Option[Offset]): Option[Batch] = None
override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
Expand Down
Loading

0 comments on commit 6a90c55

Please sign in to comment.