Skip to content

Commit

Permalink
Roll forward "[SPARK-23096][SS] Migrate rate source to V2"
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Roll forward c68ec4e (#20688).

There are two minor test changes required:

* An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException.
* The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils.

## How was this patch tested?

existing tests

Author: Jose Torres <torres.joseph.f+github@gmail.com>
Author: jerryshao <sshao@hortonworks.com>

Closes #20922 from jose-torres/ratefix.
  • Loading branch information
jose-torres authored and cloud-fan committed Mar 30, 2018
1 parent b02e76c commit 5b5a36e
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 663 deletions.
Expand Up @@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -566,6 +566,7 @@ object DataSource extends Logging {
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -587,7 +588,8 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
}

Expand Down

This file was deleted.

Expand Up @@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
Expand All @@ -40,8 +40,8 @@ class RateStreamContinuousReader(options: DataSourceOptions)

val creationTime = System.currentTimeMillis()

val numPartitions = options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
Expand All @@ -57,12 +57,12 @@ class RateStreamContinuousReader(options: DataSourceOptions)
RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
}

override def readSchema(): StructType = RateSourceProvider.SCHEMA
override def readSchema(): StructType = RateStreamProvider.SCHEMA

private var offset: Offset = _

override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
this.offset = offset.orElse(createInitialOffset(numPartitions, creationTime))
}

override def getStartOffset(): Offset = offset
Expand Down Expand Up @@ -98,6 +98,19 @@ class RateStreamContinuousReader(options: DataSourceOptions)
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}

private def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = {
RateStreamOffset(
Range(0, numPartitions).map { i =>
// Note that the starting offset is exclusive, so we have to decrement the starting value
// by the increment that will later be applied. The first row output in each
// partition will have a value equal to the partition index.
(i,
ValueRunTimeMsPair(
(i - numPartitions).toLong,
creationTimeMs))
}.toMap)
}

}

case class RateStreamContinuousDataReaderFactory(
Expand Down

0 comments on commit 5b5a36e

Please sign in to comment.