Skip to content

Commit

Permalink
improve document
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Aug 21, 2018
1 parent 844bd6f commit 9acda35
Show file tree
Hide file tree
Showing 32 changed files with 97 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
* batches of Kafka data in a micro-batch streaming query.
*/
override def createMicroBatchReadSupport(
override def getMicroBatchReadSupport(
metadataPath: String,
options: DataSourceOptions): KafkaMicroBatchReadSupport = {

Expand Down Expand Up @@ -151,7 +151,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
* Kafka data in a continuous streaming query.
*/
override def createContinuousReadSupport(
override def getContinuousReadSupport(
metadataPath: String,
options: DataSourceOptions): KafkaContinuousReadSupport = {
val parameters = options.asMap().asScala.toMap
Expand Down Expand Up @@ -267,7 +267,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

override def createStreamingWriteSupport(
override def getStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
val readSupport = provider.createMicroBatchReadSupport(
val readSupport = provider.getMicroBatchReadSupport(
dir.getAbsolutePath, new DataSourceOptions(options.asJava))
val config = readSupport.newScanConfigBuilder(
KafkaSourceOffset(Map(tp -> 0L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for batch processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* This interface is used to return {@link BatchReadSupport} instances when end users run
* {@code SparkSession.read.format(...).option(...).load()}.
*/
@InterfaceStability.Evolving
public interface BatchReadSupportProvider extends DataSourceV2 {

/**
* Creates a {@link BatchReadSupport} to scan the data from this data source with a user
* Returns a {@link BatchReadSupport} instance to load the data from this data source with a user
* specified schema.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
Expand All @@ -43,15 +43,15 @@ public interface BatchReadSupportProvider extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) {
default BatchReadSupport getBatchReadSupport(StructType schema, DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}

/**
* Creates a {@link BatchReadSupport} to scan the data from this data source.
* Returns a {@link BatchReadSupport} instance to scan the data from this data source.
*
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
BatchReadSupport createBatchReadSupport(DataSourceOptions options);
BatchReadSupport getBatchReadSupport(DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data writing ability for batch processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* This interface is used to return {@link BatchWriteSupport} instances when end users run
* {@code Dataset.write.format(...).option(...).save()}.
*/
@InterfaceStability.Evolving
public interface BatchWriteSupportProvider extends DataSourceV2 {

/**
* Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done according to the save mode.
* Returns an optional {@link BatchWriteSupport} instance to save the data to this data source.
* Data sources can return None if there is no writing needed to be done according to the save
* mode.
*
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
Expand All @@ -48,7 +49,7 @@ public interface BatchWriteSupportProvider extends DataSourceV2 {
* case-insensitive string-to-string map.
* @return a write support to write data to this data source.
*/
Optional<BatchWriteSupport> createBatchWriteSupport(
Optional<BatchWriteSupport> getBatchWriteSupport(
String queryId,
StructType schema,
SaveMode mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for continuous stream processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* {@code SparkSession.readStream.format(...).option(...).load()}.
* This interface is used to return {@link ContinuousReadSupport} instances when end users run
* {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger.
*/
@InterfaceStability.Evolving
public interface ContinuousReadSupportProvider extends DataSourceV2 {

/**
* Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source with
* a user specified schema.
* Returns a {@link ContinuousReadSupport} instance to scan the data from this streaming data
* source with a user specified schema.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
Expand All @@ -46,23 +46,24 @@ public interface ContinuousReadSupportProvider extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default ContinuousReadSupport createContinuousReadSupport(
default ContinuousReadSupport getContinuousReadSupport(
StructType schema,
String checkpointLocation,
DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}

/**
* Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source.
* Returns a {@link ContinuousReadSupport} instance to scan the data from this streaming data
* source.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Readers for the same logical source in the same query
* will be given the same checkpointLocation.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
ContinuousReadSupport createContinuousReadSupport(
ContinuousReadSupport getContinuousReadSupport(
String checkpointLocation,
DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for micro-batch stream processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* {@code SparkSession.readStream.format(...).option(...).load()}.
* This interface is used to return {@link MicroBatchReadSupport} instances when end users run
* {@code SparkSession.readStream.format(...).option(...).load()} with a micro-batch trigger.
*/
@InterfaceStability.Evolving
public interface MicroBatchReadSupportProvider extends DataSourceV2 {

/**
* Creates a {@link MicroBatchReadSupport} to scan the data from this streaming data source with
* a user specified schema.
* Returns a {@link MicroBatchReadSupport} instance to scan the data from this streaming data
* source with a user specified schema.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
Expand All @@ -46,23 +46,24 @@ public interface MicroBatchReadSupportProvider extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default MicroBatchReadSupport createMicroBatchReadSupport(
default MicroBatchReadSupport getMicroBatchReadSupport(
StructType schema,
String checkpointLocation,
DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}

/**
* Creates a {@link MicroBatchReadSupport} to scan the data from this streaming data source.
* Returns a {@link MicroBatchReadSupport} instance to scan the data from this streaming data
* source.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Readers for the same logical source in the same query
* will be given the same checkpointLocation.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
MicroBatchReadSupport createMicroBatchReadSupport(
MicroBatchReadSupport getMicroBatchReadSupport(
String checkpointLocation,
DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data writing ability for structured streaming.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* {@code Dataset.writeStream.format(...).option(...).save()}.
* This interface is used to return {@link StreamingWriteSupport} instances when end users run
* {@code Dataset.writeStream.format(...).option(...).start()}.
*/
@InterfaceStability.Evolving
public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {

/**
* Creates a {@link StreamingWriteSupport} to save the data to this data source.
* Returns a {@link StreamingWriteSupport} instance to save the data to this data source.
*
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
Expand All @@ -45,7 +45,7 @@ public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreami
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
StreamingWriteSupport createStreamingWriteSupport(
StreamingWriteSupport getStreamingWriteSupport(
String queryId,
StructType schema,
OutputMode mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* An interface that defines how to scan the data from data source for batch processing.
* An interface that defines how to load the data from data source for batch processing.
*
* The execution engine will create an instance of this interface at the start of a batch query,
* then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}. The
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
* query, then call {@link #newScanConfigBuilder()} to create an instance of {@link ScanConfig}. The
* {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
* {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
* factory to process data from the data source.
* factory to scan data from the data source.
*/
@InterfaceStability.Evolving
public interface BatchReadSupport extends ReadSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.spark.annotation.InterfaceStability;

/**
*
* A serializable representation of an input partition returned by
* {@link ReadSupport#planInputPartitions(ScanConfig)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;

/**
* An interface that defines how to scan the data from data source for continuous streaming
* An interface that defines how to load the data from data source for continuous streaming
* processing.
*
* The execution engine will create an instance of this interface at the start of a streaming query,
* then call {@link #newScanConfigBuilder(Offset)} and create an instance of {@link ScanConfig} for
* the duration of the streaming query or until {@link #needsReconfiguration(ScanConfig)} is true.
* The {@link ScanConfig} will be used to create input partitions and reader factory to process data
* for its duration. At the end {@link #stop()} will be called when the streaming execution is
* completed. Note that a single query may have multiple executions due to restart or failure
* recovery.
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset)} to create an instance of
* {@link ScanConfig} for the duration of the streaming query or until
* {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
* input partitions and reader factory to scan data for its duration. At the end {@link #stop()}
* will be called when the streaming execution is completed. Note that a single query may have
* multiple executions due to restart or failure recovery.
*/
@InterfaceStability.Evolving
public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
* An interface that defines how to scan the data from data source for micro-batch streaming
* processing.
*
* The execution engine will create an instance of this interface at the start of a streaming query,
* then call {@link #newScanConfigBuilder(Offset, Offset)} and create an instance of
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} to create an instance of
* {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be used to create input
* partitions and reader factory to process a micro-batch. At the end {@link #stop()} will be called
* partitions and reader factory to scan a micro-batch. At the end {@link #stop()} will be called
* when the streaming execution is completed. Note that a single query may have multiple executions
* due to restart or failure recovery.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

} else {
val writer = provider.createBatchWriteSupport(
val writer = provider.getBatchWriteSupport(
UUID.randomUUID().toString,
df.logicalPlan.output.toStructType,
mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ object DataSourceV2Relation {
val v2Options = new DataSourceOptions(options.asJava)
userSpecifiedSchema match {
case Some(s) =>
asReadSupportProvider.createBatchReadSupport(s, v2Options)
asReadSupportProvider.getBatchReadSupport(s, v2Options)
case _ =>
asReadSupportProvider.createBatchReadSupport(v2Options)
asReadSupportProvider.getBatchReadSupport(v2Options)
}
}

def createWriteSupport(
options: Map[String, String],
schema: StructType): BatchWriteSupport = {
asWriteSupportProvider.createBatchWriteSupport(
asWriteSupportProvider.getBatchWriteSupport(
UUID.randomUUID().toString,
schema,
SaveMode.Append,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MicroBatchExecution(
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val readSupport = dataSourceV2.createMicroBatchReadSupport(
val readSupport = dataSourceV2.getMicroBatchReadSupport(
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
Expand Down Expand Up @@ -496,7 +496,7 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: StreamingWriteSupportProvider =>
val writer = s.createStreamingWriteSupport(
val writer = s.getStreamingWriteSupport(
s"$runId",
newAttributePlan.schema,
outputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ConsoleSinkProvider extends DataSourceV2
with DataSourceRegister
with CreatableRelationProvider {

override def createStreamingWriteSupport(
override def getStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ContinuousExecution(
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1

dataSource.createContinuousReadSupport(
dataSource.getContinuousReadSupport(
metadataPath,
new DataSourceOptions(extraReaderOptions.asJava))
}
Expand Down Expand Up @@ -185,7 +185,7 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}

val writer = sink.createStreamingWriteSupport(
val writer = sink.getStreamingWriteSupport(
s"$runId",
triggerLogicalPlan.schema,
outputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa

// ContinuousReadSupportProvider implementation
// This is necessary because of how StreamTest finds the source for AddDataMemory steps.
override def createContinuousReadSupport(
override def getContinuousReadSupport(
checkpointLocation: String,
options: DataSourceOptions): ContinuousReadSupport = this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class ForeachWriteSupportProvider[T](
converter: Either[ExpressionEncoder[T], InternalRow => T])
extends StreamingWriteSupportProvider {

override def createStreamingWriteSupport(
override def getStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
Expand Down

0 comments on commit 9acda35

Please sign in to comment.