Skip to content

Commit

Permalink
Added another test
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Dec 20, 2016
1 parent a6a3677 commit b5f216e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
OutputMode.Update
case _ =>
throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
"Accepted output modes are 'append' and 'complete'")
"Accepted output modes are 'append', 'complete', 'update'")
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.PrivateMethodTester.PrivateMethod

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -105,7 +106,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}
}

class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with PrivateMethodTester {

private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
Expand Down Expand Up @@ -388,18 +389,39 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {

private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath

test("check outputMode(string) throws exception on unsupported modes") {
test("supported strings in outputMode(string)") {
val outputModeMethod = PrivateMethod[OutputMode]('outputMode)

def testMode(outputMode: String, expected: OutputMode): Unit = {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
val w = df.writeStream
w.outputMode(outputMode)
val setOutputMode = w invokePrivate outputModeMethod()
assert(setOutputMode === expected)
}

testMode("append", OutputMode.Append)
testMode("Append", OutputMode.Append)
testMode("complete", OutputMode.Complete)
testMode("Complete", OutputMode.Complete)
testMode("update", OutputMode.Update)
testMode("Update", OutputMode.Update)
}

test("unsupported strings in outputMode(string") {
def testError(outputMode: String): Unit = {
val acceptedModes = Seq("append", "update", "complete")
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
val w = df.writeStream
val e = intercept[IllegalArgumentException](w.outputMode(outputMode))
Seq("output mode", "unknown", outputMode).foreach { s =>
(Seq("output mode", "unknown", outputMode) ++ acceptedModes).foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
}
}
testError("Update")
testError("Xyz")
}

Expand Down

0 comments on commit b5f216e

Please sign in to comment.